Adding sync block macro

This commit is contained in:
2026-03-12 15:24:03 +01:00
parent 008a765fe3
commit e13b4a49b5
7 changed files with 320 additions and 25 deletions

View File

@ -1,4 +1,4 @@
use crate::edge::BlockIOIndex;
use crate::edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex};
pub enum BlockResult
{
@ -16,8 +16,18 @@ pub trait BlockIO
fn set_index(&self, block_index: usize);
// Number of input/output ports
fn input_count(&self);
fn output_count(&self);
fn input_count(&self) -> usize;
fn output_count(&self) -> usize;
// Stream managment
fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer);
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer);
fn create_anonymous_stream_for(
&mut self,
output_index: usize,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
}
pub trait Block
@ -25,4 +35,12 @@ pub trait Block
fn work(&mut self) -> BlockResult;
}
pub trait SyncBlock
{
type Input;
type Output;
fn sync_work(&mut self, input: Self::Input) -> Self::Output;
}
pub trait GraphableBlock: Block + BlockIO {}

View File

@ -1,8 +1,12 @@
use std::any::Any;
use std::sync::Arc;
use std::sync::Mutex;
use crate::stream;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
pub struct Edge
{
@ -22,15 +26,55 @@ pub struct BlockIOIndex
pub port_index: usize,
}
pub struct In<T>
// Needed for graph to be able to manipulate
// stream endings without knowing the generic type
pub struct AnonymousStreamProducer
{
stream: Option<StreamProducer<T>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
inner: Box<dyn Any>,
}
pub struct Out<T>
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
}
impl<T: 'static> From<StreamProducer<T>> for AnonymousStreamProducer
{
fn from(value: StreamProducer<T>) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value),
}
}
}
impl<T: 'static> From<StreamConsumer<T>> for AnonymousStreamConsumer
{
fn from(value: StreamConsumer<T>) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value),
}
}
}
impl AnonymousStreamProducer
{
pub fn downcast<T: 'static>(self) -> StreamProducer<T>
{
*self.inner.downcast::<StreamProducer<T>>().unwrap()
}
}
impl AnonymousStreamConsumer
{
pub fn downcast<T: 'static>(self) -> StreamConsumer<T>
{
*self.inner.downcast::<StreamConsumer<T>>().unwrap()
}
}
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
@ -38,7 +82,15 @@ pub struct Out<T>
edge: Arc<Mutex<Edge>>,
}
impl<T> In<T>
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
impl<T: 'static> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
@ -49,9 +101,19 @@ impl<T> In<T>
{
self.edge.lock().unwrap().from
}
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
self.stream = Some(consumer.downcast::<T>())
}
pub fn read<'a>(&'a mut self) -> StreamReader<'a, T>
{
self.stream.as_mut().unwrap().read()
}
}
impl<T> Out<T>
impl<T: 'static> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
@ -62,4 +124,25 @@ impl<T> Out<T>
{
self.edge.lock().unwrap().to
}
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
self.stream = Some(producer.downcast::<T>())
}
// Delegate stream creation to Out object
// which knows the stream type
pub fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
(tx.into(), rx.into())
}
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
{
self.stream.as_mut().unwrap().write()
}
}

View File

@ -1,7 +1,11 @@
use oxydsp_flowgraph::edge::{In, Out};
use oxydsp_flowgraph_macros::BlockIO;
use oxydsp_flowgraph::{
block::SyncBlock,
edge::{In, Out},
};
use oxydsp_flowgraph_macros::{BlockIO, sync_block};
#[derive(BlockIO)]
//#[sync_block]
pub struct Test
{
#[input]
@ -11,4 +15,35 @@ pub struct Test
output: Out<u32>,
}
impl oxydsp_flowgraph::block::Block for Test
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let mut len = usize::MAX;
let mut input_reader = self.input.read();
len = len.min(input_reader.len());
let mut output_writer = self.output.write();
len = len.min(output_writer.len());
for _ in 0..len
{
let input = input_reader.pop().unwrap();
let (output_out,) = self.sync_work((input,));
output_writer.push(output_out).unwrap();
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}
impl SyncBlock for Test
{
type Input = (u32,);
type Output = (u32,);
fn sync_work(&mut self, (num,): Self::Input) -> Self::Output
{
(num,)
}
}
fn main() {}

View File

@ -56,8 +56,8 @@ unsafe impl<T> Sync for StreamConsumer<T> {}
pub struct StreamWriter<'a, T>
{
producer: &'a StreamProducer<T>,
first: &'a mut [MaybeUninit<T>],
second: Option<&'a mut [MaybeUninit<T>]>,
first: &'a UnsafeCell<[MaybeUninit<T>]>,
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
written: usize,
}
@ -144,6 +144,14 @@ impl<T> StreamProducer<T>
let (start_to_tail, _tail_to_head) =
start_to_head.split_at_mut_unchecked(wrapped_tail);
// Slices are wrapped into unsafe cells to provide interior mutability
// On the stream as it is much more convienient.
//
// SAFETY:
//
// This functions borrows the stream mutably. As such, only one instance
// of StreamWriter can exist for a given stream. The StreamWriter
// is thus the only on able to write or read the stream when it lives
let first = head_to_end;
let second = Some(start_to_tail);
StreamWriter {
@ -354,7 +362,7 @@ impl<'a, T> StreamWriter<'a, T>
self.len() == 0
}
pub fn push(&mut self, element: T) -> Result<(), T>
pub fn push(&self, element: T) -> Result<(), T>
{
if self.written < self.first.len()
{