Adds pulse shaping, work stealing

This commit is contained in:
2026-04-09 16:33:42 +02:00
parent 4d548a7973
commit 54f26a0dd2
32 changed files with 1305 additions and 340 deletions

View File

@ -4,4 +4,5 @@ version = "0.1.0"
edition = "2024"
[dependencies]
crossbeam-deque = "0.8.6"
oxydsp-flowgraph-macros = { path = "./oxydsp-flowgraph-macros" }

View File

@ -1,19 +1,11 @@
use proc_macro::TokenStream;
use zyn::FromInput;
use zyn::ToTokens;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::ext::ItemExt;
use zyn::format_ident;
use zyn::ident;
use zyn::parse_input;
use zyn::syn::Attribute;
use zyn::syn::GenericParam;
use zyn::syn::Index;
use zyn::syn::Lit;
use zyn::syn::TypeGenerics;
use zyn::syn::parse_quote;
use zyn::syn::punctuated::Punctuated;
use zyn::syn::spanned::Spanned;
mod sync;
@ -52,53 +44,67 @@ pub fn block_io(
impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.clone() }} {{ type_generics }}
{{ where_clause }}
{
@block_io_set_index(fields = fields.clone())
@block_io_get_successors(fields = fields.clone())
@block_io_counts(fields = fields.clone())
@block_io_set_streams(fields = fields.clone())
@block_io_create_stream(fields = fields.clone())
@block_io_get_inputs(fields = fields.clone())
@block_io_get_outputs(fields = fields.clone())
@block_io_get_meta(ident = ident.clone(), fields = fields.clone())
}
)
}
#[zyn::element]
fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream
fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn set_index(&self, block_index: usize)
fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousIn>
{
use oxydsp_flowgraph::edge::BlockIOIndex;
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockInput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
acc.extend(self.{{field.1.ident}}.get_inputs_mut());
}
acc
}
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousIn>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockInput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
acc.extend(self.{{field.1.ident}}.get_inputs());
}
acc
}
)
}
#[zyn::element]
fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream
fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn get_successors(&self) -> Vec<oxydsp_flowgraph::edge::BlockIOIndex>
fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousOut>
{
let mut output = vec![];
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockOutput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block()
{
output.push(block_index);
}
acc.extend(self.{{field.1.ident}}.get_outputs_mut());
}
output
acc
}
fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousOut>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockOutput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
acc.extend(self.{{field.1.ident}}.get_outputs());
}
acc
}
)
}
@ -144,94 +150,6 @@ fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::T
)
}
#[zyn::element]
fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
let input_count = fields
.iter()
.filter(|x| x.attrs.iter().any(|x| x.is("input")))
.count();
let output_count = fields
.iter()
.filter(|x| x.attrs.iter().any(|x| x.is("output")))
.count();
zyn::zyn!(
fn input_count(&self) -> usize
{
return { { input_count } };
}
fn output_count(&self) -> usize
{
return { { output_count } };
}
)
}
#[zyn::element(debug = "pretty")]
fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
{
zyn::zyn!(
#[allow(unreachable_code)]
fn set_anonymous_out_stream(
&mut self,
output_index: usize,
producer: oxydsp_flowgraph::io::AnonymousStreamProducer,
)
{
match output_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(producer),
}
_ => panic!("output_index out of bounds.")
};
}
#[allow(unreachable_code)]
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::io::AnonymousStreamConsumer)
{
match input_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(consumer),
}
_ => panic!("output_index out of bounds.")
};
}
)
}
#[zyn::element]
fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream
{
zyn::zyn!(
#[allow(unreachable_code)]
fn create_anonymous_stream_for(
&mut self,
output_index: usize,
capacity: usize,
) -> (
oxydsp_flowgraph::io::AnonymousStreamProducer,
oxydsp_flowgraph::io::AnonymousStreamConsumer,
)
{
let output = match output_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.0 }} => self.{{ field.1.ident }}.create_anonymous_stream(capacity),
}
_ => panic!("output_index out of bounds."),
};
return output;
}
)
}
#[zyn::element]
fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream
{

View File

@ -1,14 +1,10 @@
use zyn::Fields;
use zyn::FromInput;
use zyn::ToTokens;
use zyn::ast::at;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::ext::TypeExt;
use zyn::quote::quote;
use zyn::syn::Field;
use zyn::syn::GenericParam;
use zyn::syn::Lifetime;
use zyn::syn::parse_quote;
use crate::SyncBlockConfig;

View File

@ -1,6 +1,8 @@
use crate::edge::BlockIOIndex;
use crate::io::AnonymousStreamConsumer;
use crate::io::AnonymousStreamProducer;
use crate::io::AnonymousIn;
use crate::io::AnonymousOut;
use crate::io::In;
use crate::io::Out;
use crate::io::edge::BlockIOIndex;
pub enum BlockResult
{
@ -15,28 +17,60 @@ pub enum BlockResult
Exit,
}
pub trait BlockInput
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>;
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>;
// Meta information
fn get_types_names(&self) -> Vec<&'static str>;
}
pub trait BlockOutput
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>;
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>;
// Meta information
fn get_types_names(&self) -> Vec<&'static str>;
}
pub trait BlockIO
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>;
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>;
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>;
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>;
fn get_successors(&self) -> Vec<BlockIOIndex>
{
self.get_outputs()
.iter()
.map(|x| x.get_consumer_block().unwrap())
.collect()
}
// Get all of the BlockIOIndices (block index + port) that
// this blocks can send data to.
fn get_successors(&self) -> Vec<BlockIOIndex>;
// Sets the index of the current blocks on the shared edges
fn set_index(&self, block_index: usize);
// Number of input/output ports
fn input_count(&self) -> usize;
fn output_count(&self) -> usize;
// Stream managment
fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer);
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer);
fn create_anonymous_stream_for(
&mut self,
output_index: usize,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
// fn get_successors(&self) -> Vec<BlockIOIndex>;
//
// // Sets the index of the current blocks on the shared edges
// fn set_index(&self, block_index: usize);
//
// // Number of input/output ports
// fn input_count(&self) -> usize;
// fn output_count(&self) -> usize;
//
// // Stream managment
// fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer);
// fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer);
//
// fn create_anonymous_stream_for(
// &mut self,
// output_index: usize,
// capacity: usize,
// ) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
// Meta information
fn get_block_name(&self) -> &'static str;
@ -67,3 +101,173 @@ pub trait SyncBlock<'view>: SyncBlockIO<'view>
pub trait GraphableBlock: Block + BlockIO {}
impl<T> GraphableBlock for T where T: Block + BlockIO {}
impl<T: 'static> BlockInput for In<T>
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>
{
vec![self]
}
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>
{
vec![self]
}
fn get_types_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<T>()]
}
}
impl<I: BlockInput> BlockInput for Option<I>
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>
{
if let Some(input) = self
{
input.get_inputs_mut()
}
else
{
vec![]
}
}
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>
{
if let Some(input) = self
{
input.get_inputs()
}
else
{
vec![]
}
}
fn get_types_names(&self) -> Vec<&'static str>
{
if let Some(input) = self
{
input.get_types_names()
}
else
{
vec![]
}
}
}
impl<I: BlockInput, const N: usize> BlockInput for [I; N]
{
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>
{
let mut output = vec![];
for input in self
{
output.extend(input.get_inputs());
}
output
}
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>
{
let mut output = vec![];
for input in self
{
output.extend(input.get_inputs_mut());
}
output
}
fn get_types_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<I>(); N]
}
}
impl<T: 'static> BlockOutput for Out<T>
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>
{
vec![self]
}
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>
{
vec![self]
}
fn get_types_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<T>()]
}
}
impl<I: BlockOutput> BlockOutput for Option<I>
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>
{
if let Some(output) = self
{
output.get_outputs_mut()
}
else
{
vec![]
}
}
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>
{
if let Some(output) = self
{
output.get_outputs()
}
else
{
vec![]
}
}
fn get_types_names(&self) -> Vec<&'static str>
{
if let Some(input) = self
{
input.get_types_names()
}
else
{
vec![]
}
}
}
impl<I: BlockOutput, const N: usize> BlockOutput for [I; N]
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>
{
let mut result = vec![];
for output in self
{
result.extend(output.get_outputs_mut());
}
result
}
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>
{
let mut result = vec![];
for output in self
{
result.extend(output.get_outputs());
}
result
}
fn get_types_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<I>(); N]
}
}

View File

@ -1,20 +0,0 @@
use std::any::Any;
#[derive(Default)]
pub struct Edge
{
// Represents the index of the block owning the Out end in the graph
// And the the output index within that block
pub from: Option<BlockIOIndex>,
// Represents the index of the block owning the In end in the graph
// And the the input index within that block
pub to: Option<BlockIOIndex>,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct BlockIOIndex
{
pub block_index: usize,
pub port_index: usize,
}

View File

@ -1,5 +0,0 @@
// Represents a FlowGrahWide, simultaneous event
pub enum FlowGraphEvent
{
Kill(String),
}

View File

@ -1,6 +1,13 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::thread::JoinHandle;
use crossbeam_deque::Steal;
use crossbeam_deque::Worker;
use crate::block;
use crate::block::GraphableBlock;
use crate::io::edge::BlockIOIndex;
#[macro_export]
macro_rules! flowgraph
@ -17,6 +24,21 @@ macro_rules! flowgraph
}
}
pub struct RunningGraph
{
worker_handles: Vec<JoinHandle<()>>,
}
impl RunningGraph
{
pub fn join(self)
{
self.worker_handles.into_iter().for_each(|j| {
let _ = j.join();
});
}
}
pub struct FlowGraph
{
blocks: Vec<Box<dyn GraphableBlock + Send + 'static>>,
@ -31,48 +53,157 @@ impl FlowGraph
pub fn add_block<T: GraphableBlock + Send + 'static>(&mut self, block: T)
{
block.set_index(self.blocks.len());
block.get_inputs().iter().enumerate().for_each(|(i, x)| {
x.set_index(BlockIOIndex {
block_index: self.blocks.len(),
port_index: i,
})
});
block.get_outputs().iter().enumerate().for_each(|(i, x)| {
x.set_index(BlockIOIndex {
block_index: self.blocks.len(),
port_index: i,
})
});
self.blocks.push(Box::new(block));
}
pub fn run(mut self) -> JoinHandle<()>
// pub fn run(mut self, thread_count: usize) -> RunningGraph
// {
// self.populate_edges();
//
// let mut worker_queues = (0..thread_count).map(|_| vec![]).collect::<Vec<_>>();
//
// for (i, block) in self.blocks.into_iter().enumerate()
// {
// worker_queues[i % thread_count].push(block);
// }
//
// let running = Arc::new(AtomicBool::new(true));
// let worker_handles = worker_queues
// .into_iter()
// .map(|mut queue| {
// let running = running.clone();
// std::thread::spawn(move || {
// 'outer: while running.load(std::sync::atomic::Ordering::Relaxed)
// {
// for block in queue.iter_mut()
// {
// match block.work()
// {
// crate::block::BlockResult::Ok =>
// {
// // Reschedule block
// }
// crate::block::BlockResult::Terminated =>
// { // DROP BLOCK
// }
// crate::block::BlockResult::Exit =>
// {
// println!("KILLING GRAPH");
// break 'outer;
// }
// }
// }
// }
// running.store(false, std::sync::atomic::Ordering::Relaxed);
// })
// })
// .collect::<Vec<_>>();
// RunningGraph { worker_handles }
// }
pub fn run(mut self, thread_count: usize) -> RunningGraph
{
self.populate_edges();
std::thread::spawn(move || {
'outer: loop
{
for x in self.blocks.iter_mut()
{
match x.work()
let worker_queues = (0..thread_count)
.map(|_| Worker::<Box<dyn GraphableBlock + Send + 'static>>::new_fifo())
.collect::<Vec<_>>();
worker_queues
.iter()
.cycle()
.zip(self.blocks)
.for_each(|(worker, block)| worker.push(block));
let stealers = worker_queues
.iter()
.map(|x| x.stealer())
.collect::<Vec<_>>();
let running = Arc::new(AtomicBool::new(true));
let worker_handles = worker_queues
.into_iter()
.map(|queue| {
let stealers = stealers.clone();
let running = running.clone();
std::thread::spawn(move || {
'outer: while running.load(std::sync::atomic::Ordering::Relaxed)
{
crate::block::BlockResult::Ok =>
{}
crate::block::BlockResult::Terminated =>
{ //break 'outer;
}
crate::block::BlockResult::Exit =>
// Try to get a job
let mut block = queue.pop().unwrap_or_else(|| {
std::iter::repeat_with(|| {
stealers
.iter()
.map(|stealer| stealer.steal_batch_and_pop(&queue))
.collect::<Steal<_>>()
.success()
})
.find(|x| x.is_some())
.unwrap()
.unwrap()
});
match block.work()
{
println!("KILLING GRAPH");
break 'outer;
crate::block::BlockResult::Ok =>
{
// Reschedule block
queue.push(block);
}
crate::block::BlockResult::Terminated =>
{ // DROP BLOCK
}
crate::block::BlockResult::Exit =>
{
println!("KILLING GRAPH");
break 'outer;
}
}
}
}
}
})
running.store(false, std::sync::atomic::Ordering::Relaxed);
})
})
.collect::<Vec<_>>();
RunningGraph { worker_handles }
}
fn populate_edges(&mut self)
{
for block_index in 0..self.blocks.len()
{
let successors = self.blocks[block_index].get_successors();
for (output_index, succ_id) in successors.iter().enumerate()
let outputs = self.blocks[block_index].get_outputs_mut();
let mut rxs = vec![];
for output in outputs.into_iter()
{
let (tx, rx) =
self.blocks[block_index].create_anonymous_stream_for(output_index, 4096);
self.blocks[block_index].set_anonymous_out_stream(output_index, tx);
self.blocks[succ_id.block_index].set_anonymous_in_stream(succ_id.port_index, rx);
//let (tx, rx) = output.create_anonymous_stream(4096);
let (tx, rx) = output.create_anonymous_stream(8192);
//let (tx, rx) = output.create_anonymous_stream(65536);
output.set_anonymous_stream(tx);
rxs.push((
output
.get_consumer_block()
.expect("Non existent destination block."),
rx,
))
}
for (index, rx) in rxs.into_iter()
{
self.blocks[index.block_index].get_inputs_mut()[index.port_index]
.set_anonymous_stream(rx);
}
}
}

View File

@ -5,17 +5,20 @@ use std::sync::Mutex;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::edge::BlockIOIndex;
use crate::edge::Edge;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::stream::{self};
use crate::tag::Tag;
use crate::tag::TagSlot;
use crate::tag::Tagged;
pub mod edge;
use crate::io::edge::BlockIOIndex;
use crate::io::edge::Edge;
/// Represents a input port for a block
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
@ -25,6 +28,7 @@ pub struct In<T>
edge: Arc<Mutex<Edge>>,
}
/// Represents a output port for a block
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
@ -34,18 +38,119 @@ pub struct Out<T>
edge: Arc<Mutex<Edge>>,
}
/// Trait to manipulate a block's input in a type agnostic/erased way
pub trait AnonymousIn
{
/// Inform the input about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
/// Returns None or the block index of the block, and the block port of the corresponding
/// Out object
fn get_producer_block(&self) -> Option<BlockIOIndex>;
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer);
}
/// Trait to manipulate a block's output in a type agnostic/erased way
pub trait AnonymousOut
{
/// Inform the output about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer);
/// Returns None or the block index of the block, and the block port of the corresponding
/// In object
fn get_consumer_block(&self) -> Option<BlockIOIndex>;
/// Creates the stream with the correct corresponding type, in a type erased way.
///
/// This delegation of stream creation is necessary to allow the graph to manipulate
/// it, as it cannot know about the generic type of the stream.
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
}
impl<T: 'static> AnonymousIn for In<T>
{
fn set_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
}
impl<T: 'static> AnonymousOut for Out<T>
{
fn set_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
// which knows the stream type
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
}
/// A Reader to get data from an input
pub struct InReader<'a, T>
{
data_reader: StreamReader<'a, T>,
tag_reader: StreamReader<'a, TagSlot>,
}
/// A writer to send data to an output
pub struct OutWriter<'a, T>
{
data_writer: StreamWriter<'a, T>,
tag_writer: StreamWriter<'a, TagSlot>,
}
/// Creates a stream that can then be used to link blocks
///
/// ```rust
/// let (output, input) = oxydsp-flowgraph::io::stream();
///
/// let writer = output.write();
/// let reader = input.read();
///
/// // ...
/// ```
pub fn stream<T>() -> (Out<T>, In<T>)
{
let edge = Arc::new(Mutex::new(Edge::default()));
@ -65,23 +170,12 @@ pub fn stream<T>() -> (Out<T>, In<T>)
impl<T: 'static> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
pub fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
/// Gets a reader view from an input.
///
/// ```
/// let reader = input.read();
/// let data = reader.pop();
/// ```
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
{
let data_reader = self.stream.as_mut().unwrap().read();
@ -95,35 +189,12 @@ impl<T: 'static> In<T>
impl<T: 'static> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
pub fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
// which knows the stream type
pub fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
/// Gets a reader view from an output.
///
/// ```
/// let writer = output.write();
/// writer.push((data, tag).into());
/// ```
pub fn write<'a>(&'a mut self) -> OutWriter<'a, T>
{
OutWriter {
@ -132,6 +203,17 @@ impl<T: 'static> Out<T>
}
}
/// Pushes an iterator to the output, sending the maximum amount of elements
/// to the output.
///
/// It will not consume the iterator more than what can be sent.
///
/// ```
/// let writer = output.write();
///
/// // Send only 42s to the output
/// writer.push_iter(std::iter::repeat(42));
/// ```
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
@ -151,7 +233,8 @@ impl<T: 'static> Out<T>
true
}
// Meta information
/// Meta information
/// Returns a string of the type of the output
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
@ -160,16 +243,22 @@ impl<T: 'static> Out<T>
impl<T> InReader<'_, T>
{
/// Gets the amount of elements that are available
/// on the input.
pub fn len(&self) -> usize
{
self.data_reader.len()
}
/// Returns true iif no elements are available on the input.
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
/// Pops an element from the input.
/// It is guaranteed to return `Some(data)` if
/// if pop was called strictly less times than len
pub fn pop(&self) -> Option<Tagged<T>>
{
let data = self.data_reader.pop_with_index();
@ -191,6 +280,9 @@ impl<T> InReader<'_, T>
}
}
/// Pops an element from the input, discarding the tag.
/// It is guaranteed to return `Some(data)` if
/// if pop was called strictly less times than len
pub fn pop_untag(&self) -> Option<T>
{
self.pop().map(|data| data.into_inner())
@ -199,16 +291,22 @@ impl<T> InReader<'_, T>
impl<T> OutWriter<'_, T>
{
/// Gets how much room is available on the output
pub fn len(&self) -> usize
{
self.data_writer.len().min(self.tag_writer.len())
}
/// Returns true iif no element can be sent
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
/// Pushes some tagged data on the input.
///
/// The operation succeeds (`Ok(())`) if there is enough room
/// Or fails returning the given data to the caller.
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
let (data, tag) = data.into();
@ -227,12 +325,17 @@ impl<T> OutWriter<'_, T>
}
}
/// Pushes some data on the input (not tagged).
///
/// The operation succeeds (`Ok(())`) if there is enough room
/// Or fails returning the given data to the caller.
pub fn push_no_tag(&self, data: T) -> Result<(), T>
{
self.data_writer.push(data)
}
}
/// An iterator type to push data to output(s)
pub struct PopIter<T>
{
len: usize,
@ -240,9 +343,16 @@ pub struct PopIter<T>
reader: T,
}
/// Type on which data can be popped from
pub trait PopIterable<'a>
{
type Output;
/// Returns an iterator on the input elements :
///
/// ```
/// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !"));
/// ```
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
@ -296,14 +406,22 @@ impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
// Needed for graph to be able to manipulate
// stream endings without knowing the generic type
/// StreamProducer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<TagSlot>,
}
/// StreamConsumer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
@ -353,10 +471,3 @@ impl AnonymousStreamConsumer
)
}
}
// pub trait PushIterable<'a, T, I>
// where
// I: Iterator<Item = T>,
// {
// fn push_iter(&'a mut self, iter: I) -> bool;
// }

View File

@ -0,0 +1,23 @@
/// Shared object between a block's input and output objects
/// so they can "communicate" and know about each other
#[derive(Default)]
pub struct Edge
{
/// Represents the index of the block owning the Out end in the graph
/// And the the output index within that block
pub from: Option<BlockIOIndex>,
/// Represents the index of the block owning the In end in the graph
/// And the the input index within that block
pub to: Option<BlockIOIndex>,
}
/// Reprensents the location of a port (input or output) in terms of
/// - The block index in which they exist
/// - Their input or output index within that block
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct BlockIOIndex
{
pub block_index: usize,
pub port_index: usize,
}

View File

@ -1,9 +1,6 @@
// This crate manages the flowgraph datastructures and execution/scheduling
// as well as the communication between the blocks
/// This crate manages the flowgraph datastructures and execution/scheduling
/// as well as the communication between the blocks
pub mod block;
pub mod edge;
pub mod event;
pub mod graph;
pub mod io;
pub mod stream;

View File

@ -1 +0,0 @@
fn main() {}

View File

@ -1,6 +1,5 @@
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::io::empty;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::sync::Arc;
@ -550,6 +549,28 @@ impl<'a, T> StreamWriter<'a, T>
Err(element)
}
}
pub fn write(&self, length: usize)
{
let new = self.written.get() + length;
assert!(new < self.len());
self.written.set(new);
}
}
impl<'a, T: Copy> StreamWriter<'a, T>
{
pub fn slices_mut(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
{
unsafe {
(
&mut *self.first.get(),
self.second
.map(|x| &mut *x.get())
.unwrap_or_else(|| &mut(&mut *self.first.get())[0..0]),
)
}
}
}
impl<'a, T> StreamReader<'a, T>
@ -643,6 +664,30 @@ impl<'a, T> StreamReader<'a, T>
None
}
}
pub fn read(&self, length: usize)
{
let new = self.read.get() + length;
assert!(new < self.len());
self.read.set(new);
}
}
impl<'a, T: Copy> StreamReader<'a, T>
{
pub fn slices(&self) -> (&[T], &[T])
{
unsafe {
(
std::mem::transmute::<&[MaybeUninit<T>], &[T]>(&*self.first.get()),
std::mem::transmute::<&[MaybeUninit<T>], &[T]>(
self.second
.map(|x| &*x.get())
.unwrap_or_else(|| &(&*self.first.get())[0..0]),
),
)
}
}
}
// When a Stream writer goes out of scope, it wrote
@ -653,13 +698,13 @@ impl<'a, T> Drop for StreamWriter<'a, T>
{
// Advance head.
// We know that this value hasn't changed since this StreamWriter was created
let head = self.producer.inner.head.load(Ordering::Relaxed);
// let head = self.producer.inner.head.load(Ordering::Relaxed);
// We want writes to the buffer to be visible when acquired in the pop side
self.producer
.inner
.head
.store(head + self.written.get(), Ordering::Release);
.store(self.start_index + self.written.get(), Ordering::Release);
}
}
@ -671,13 +716,13 @@ impl<'a, T> Drop for StreamReader<'a, T>
{
// Advance tail.
// We know that this value hasn't changed since this StreamWriter was created
let tail = self.producer.inner.tail.load(Ordering::Relaxed);
// let tail = self.producer.inner.tail.load(Ordering::Relaxed);
// We want writes to the buffer to be visible when acquired in the push side
self.producer
.inner
.tail
.store(tail + self.read.get(), Ordering::Release);
.store(self.start_index + self.read.get(), Ordering::Release);
}
}

View File

@ -48,7 +48,7 @@ use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::RwLock;
/// Object to allocate tags
/// Object to allocate tags and give a unique identifier per tag
struct TagAllocator
{
// Counter to uniquely identify allocated tags
@ -58,27 +58,32 @@ struct TagAllocator
labels: HashMap<usize, (&'static str, TagLabel)>,
}
// Label for a tag like : "symbol", "packet_start", "error"
/// Label for a tag like : "symbol", "packet_start", "error"
struct TagLabel
{
label: String,
// TODO: Allow user customization of labels
// maybe multiple labels
_label: String,
}
// Front for tag allocator
/// Object from which TagKeys are obtained. This guarantees absence of collisions between tags
pub struct Tags
{
allocator: Arc<RwLock<TagAllocator>>,
}
/// Used to anotate tags entries and retrieve them
#[derive(Clone)]
pub struct TagKey<T>
{
key: usize,
owner: Arc<RwLock<TagAllocator>>,
// Maybe used later to retrieve labels for example
_owner: Arc<RwLock<TagAllocator>>,
_phantom: PhantomData<T>,
}
// Tags a particular sample within a specific stream
/// Tags a particular sample within a specific stream
#[derive(Clone)]
pub(crate) struct TagSlot
{
@ -93,7 +98,7 @@ pub(crate) struct TagSlot
pub tag: Tag,
}
// Tag key value pairs
/// A Tag object containing TagKey-value pairs
#[derive(Clone)]
pub struct Tag
{
@ -102,6 +107,7 @@ pub struct Tag
impl Tags
{
/// Creates a new tag allocator
pub fn new() -> Self
{
Self {
@ -112,12 +118,13 @@ impl Tags
}
}
/// Allocates a new unique tag key
pub fn allocate_tag<T>(&mut self, label: impl AsRef<str>) -> TagKey<T>
{
let k = self.allocator.write().unwrap().allocate_tag::<T>(label);
TagKey {
key: k,
owner: self.allocator.clone(),
_owner: self.allocator.clone(),
_phantom: Default::default(),
}
}
@ -125,6 +132,7 @@ impl Tags
impl TagAllocator
{
/// Allocates a new unique tag key
pub fn allocate_tag<T>(&mut self, label: impl AsRef<str>) -> usize
{
let key = self.counter;
@ -133,7 +141,7 @@ impl TagAllocator
(
std::any::type_name::<T>(),
TagLabel {
label: label.as_ref().to_owned(),
_label: label.as_ref().to_owned(),
},
),
);
@ -144,6 +152,7 @@ impl TagAllocator
impl Default for Tags
{
/// Creates a new tag allocator
fn default() -> Self
{
Self::new()
@ -152,6 +161,7 @@ impl Default for Tags
impl Tag
{
/// Creates a new empty tag
pub fn new() -> Self
{
Self {
@ -159,6 +169,7 @@ impl Tag
}
}
/// Creates a new tag with a (key, value) entry
pub fn with_entry<T: 'static + Send + Sync>(key: TagKey<T>, value: T) -> Self
{
let new_tag = Self::default();
@ -166,6 +177,7 @@ impl Tag
new_tag
}
/// Creates a new tag, which is the combination of the given tags
pub fn from_tags<const N: usize>(tag_opts: [&Tag; N]) -> Tag
{
let new_tag = Self::default();
@ -181,6 +193,10 @@ impl Tag
new_tag
}
/// Creates a new tag option, which is the combination of the given tag options
///
/// If all the tag options are None, None is returned
/// Otherwise it is Some of the combination of all of the tags which are Some
pub fn from_tag_opts<const N: usize>(tag_opts: [&Option<Tag>; N]) -> Option<Tag>
{
if tag_opts.iter().all(|t| t.is_none())
@ -201,11 +217,14 @@ impl Tag
Some(new_tag)
}
/// Adds a new entry in the tag. If it already exists, it is overwritten
pub fn add_entry<T: 'static + Send + Sync>(&self, key: TagKey<T>, value: T)
{
self.data.write().unwrap().insert(key.key, Arc::new(value));
}
/// Retrieves an entry in tag. If ther is no such entry corresponding to the key,
/// retruns None
pub fn retrieve<T: 'static + Send + Sync>(&self, key: &TagKey<T>) -> Option<Arc<T>>
{
let element = self.data.read().unwrap().get(&key.key).cloned();