Starts synchronous block interface

This commit is contained in:
2026-03-19 15:28:31 +01:00
parent f727c119b8
commit 2baee8d28b
11 changed files with 348 additions and 260 deletions

View File

@ -1,4 +1,7 @@
use crate::edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex};
use crate::{
edge::BlockIOIndex,
io::{AnonymousStreamConsumer, AnonymousStreamProducer},
};
pub enum BlockResult
{
@ -46,13 +49,18 @@ pub trait Block
fn work(&mut self) -> BlockResult;
}
pub trait SyncBlock
// Represents the input, output, state types
// that a SyncBlock will have to interacti with
pub trait SyncBlockIO
{
type StateView;
type Input;
type Output;
type State;
}
fn sync_work(state: &mut Self::State, input: Self::Input) -> Option<Self::Output>;
pub trait SyncBlock: SyncBlockIO
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>;
}
pub trait GraphableBlock: Block + BlockIO {}

View File

@ -1,17 +1,4 @@
use std::any::Any;
use std::collections::binary_heap::Iter;
use std::sync::Arc;
use std::sync::Mutex;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::stream;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::tag::Tag;
#[derive(Default)]
pub struct Edge
@ -31,128 +18,3 @@ pub struct BlockIOIndex
pub block_index: usize,
pub port_index: usize,
}
// Needed for graph to be able to manipulate
// stream endings without knowing the generic type
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
}
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
}
impl<T: 'static> From<StreamProducer<T>> for AnonymousStreamProducer
{
fn from(value: StreamProducer<T>) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value),
}
}
}
impl<T: 'static> From<StreamConsumer<T>> for AnonymousStreamConsumer
{
fn from(value: StreamConsumer<T>) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value),
}
}
}
impl AnonymousStreamProducer
{
pub fn downcast<T: 'static>(self) -> StreamProducer<T>
{
*self.inner.downcast::<StreamProducer<T>>().unwrap()
}
}
impl AnonymousStreamConsumer
{
pub fn downcast<T: 'static>(self) -> StreamConsumer<T>
{
*self.inner.downcast::<StreamConsumer<T>>().unwrap()
}
}
pub struct PopIter<T>
{
len: usize,
popped: usize,
reader: T,
}
pub trait PopIterable<'a>
{
type Output;
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
impl<'a, T: 'static> PopIterable<'a> for In<T>
{
type Output = StreamReader<'a, T>;
fn pop_iter(&'a mut self) -> PopIter<StreamReader<'a, T>>
{
let reader = self.read();
PopIter {
popped: 0,
len: reader.len(),
reader,
}
}
}
generate_pop_iterable_tuple_impl! {2}
generate_pop_iterable_tuple_impl! {3}
generate_pop_iterable_tuple_impl! {4}
generate_pop_iterable_tuple_impl! {5}
generate_pop_iterable_tuple_impl! {6}
generate_pop_iterable_tuple_impl! {7}
generate_pop_iterable_tuple_impl! {8}
generate_pop_iterable_tuple_impl! {9}
generate_pop_iterable_tuple_impl! {10}
generate_pop_iterable_tuple_impl! {11}
generate_pop_iterable_tuple_impl! {12}
generate_pop_iterable_tuple_impl! {13}
generate_pop_iterable_tuple_impl! {14}
generate_pop_iterable_tuple_impl! {15}
generate_pop_iterable_tuple_impl! {16}
generate_pop_iterable_tuple_impl! {17}
generate_pop_iterable_tuple_impl! {18}
generate_pop_iterable_tuple_impl! {19}
generate_pop_iterable_tuple_impl! {20}
impl<'a, T> Iterator for PopIter<StreamReader<'a, T>>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item>
{
self.reader.pop()
}
}
impl_iterator_for_pop_iter_tuple! {2}
impl_iterator_for_pop_iter_tuple! {3}
impl_iterator_for_pop_iter_tuple! {4}
impl_iterator_for_pop_iter_tuple! {5}
impl_iterator_for_pop_iter_tuple! {6}
impl_iterator_for_pop_iter_tuple! {7}
impl_iterator_for_pop_iter_tuple! {8}
impl_iterator_for_pop_iter_tuple! {9}
impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
impl_iterator_for_pop_iter_tuple! {13}
impl_iterator_for_pop_iter_tuple! {14}
impl_iterator_for_pop_iter_tuple! {15}
impl_iterator_for_pop_iter_tuple! {16}
impl_iterator_for_pop_iter_tuple! {17}
impl_iterator_for_pop_iter_tuple! {18}
impl_iterator_for_pop_iter_tuple! {19}
impl_iterator_for_pop_iter_tuple! {20}

View File

@ -1,8 +1,10 @@
use std::any::Any;
use std::sync::Arc;
use std::sync::Mutex;
use crate::edge::AnonymousStreamConsumer;
use crate::edge::AnonymousStreamProducer;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::edge::BlockIOIndex;
use crate::edge::Edge;
use crate::stream::StreamConsumer;
@ -73,7 +75,9 @@ impl<T: 'static> In<T>
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
self.stream = Some(consumer.downcast::<T>())
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
@ -101,7 +105,9 @@ impl<T: 'static> Out<T>
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
self.stream = Some(producer.downcast::<T>())
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
@ -112,7 +118,8 @@ impl<T: 'static> Out<T>
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
(tx.into(), rx.into())
let (tx_tag, rx_tag) = stream::bounded_queue::<Tag>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
pub fn write<'a>(&'a mut self) -> OutWriter<'a, T>
@ -123,7 +130,7 @@ impl<T: 'static> Out<T>
}
}
pub fn push_iter<I: Iterator<Item = T>>(&mut self, mut iter: I) -> bool
pub fn push_iter<I: Iterator<Item = (T, Option<Tag>)>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
@ -132,7 +139,17 @@ impl<T: 'static> Out<T>
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
match elt.1
{
Some(tag) =>
{
let _ = writer.push_tagged(elt.0, tag);
}
None =>
{
let _ = writer.push_no_tag(elt.0);
}
}
}
else
{
@ -182,3 +199,179 @@ impl<T> InReader<'_, T>
self.pop_tagged().map(|(data, _)| data)
}
}
impl<T> OutWriter<'_, T>
{
pub fn len(&self) -> usize
{
self.data_writer.len().min(self.tag_writer.len())
}
pub fn push(&self, (data, tag): (T, Option<Tag>)) -> Result<(), (T, Option<Tag>)>
{
match self.data_writer.push(data)
{
Ok(_) if tag.is_some() =>
{
let _ = self.tag_writer.push(tag.unwrap());
Ok(())
}
Ok(_) => Ok(()),
Err(data) => Err((data, tag)),
}
}
pub fn push_tagged(&self, data: T, tag: Tag) -> Result<(), (T, Tag)>
{
let res = self.data_writer.push(data);
match res
{
Ok(_) =>
{
let _ = self.tag_writer.push(tag);
Ok(())
}
Err(t) => Err((t, tag)),
}
}
pub fn push_no_tag(&self, data: T) -> Result<(), T>
{
self.data_writer.push(data)
}
}
pub struct PopIter<T>
{
len: usize,
popped: usize,
reader: T,
}
pub trait PopIterable<'a>
{
type Output;
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
impl<'a, T: 'static> PopIterable<'a> for In<T>
{
type Output = InReader<'a, T>;
fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
{
let reader = self.read();
PopIter {
popped: 0,
len: reader.len(),
reader,
}
}
}
generate_pop_iterable_tuple_impl! {2}
generate_pop_iterable_tuple_impl! {3}
generate_pop_iterable_tuple_impl! {4}
generate_pop_iterable_tuple_impl! {5}
generate_pop_iterable_tuple_impl! {6}
generate_pop_iterable_tuple_impl! {7}
generate_pop_iterable_tuple_impl! {8}
generate_pop_iterable_tuple_impl! {9}
generate_pop_iterable_tuple_impl! {10}
generate_pop_iterable_tuple_impl! {11}
generate_pop_iterable_tuple_impl! {12}
generate_pop_iterable_tuple_impl! {13}
generate_pop_iterable_tuple_impl! {14}
generate_pop_iterable_tuple_impl! {15}
generate_pop_iterable_tuple_impl! {16}
generate_pop_iterable_tuple_impl! {17}
generate_pop_iterable_tuple_impl! {18}
generate_pop_iterable_tuple_impl! {19}
generate_pop_iterable_tuple_impl! {20}
impl<'a, T> Iterator for PopIter<InReader<'a, T>>
{
type Item = (T, Option<Tag>);
fn next(&mut self) -> Option<Self::Item>
{
self.reader.pop_tagged()
}
}
impl_iterator_for_pop_iter_tuple! {2}
impl_iterator_for_pop_iter_tuple! {3}
impl_iterator_for_pop_iter_tuple! {4}
impl_iterator_for_pop_iter_tuple! {5}
impl_iterator_for_pop_iter_tuple! {6}
impl_iterator_for_pop_iter_tuple! {7}
impl_iterator_for_pop_iter_tuple! {8}
impl_iterator_for_pop_iter_tuple! {9}
impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
impl_iterator_for_pop_iter_tuple! {13}
impl_iterator_for_pop_iter_tuple! {14}
impl_iterator_for_pop_iter_tuple! {15}
impl_iterator_for_pop_iter_tuple! {16}
impl_iterator_for_pop_iter_tuple! {17}
impl_iterator_for_pop_iter_tuple! {18}
impl_iterator_for_pop_iter_tuple! {19}
impl_iterator_for_pop_iter_tuple! {20}
// Needed for graph to be able to manipulate
// stream endings without knowing the generic type
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<Tag>,
}
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
inner_tag: StreamConsumer<Tag>,
}
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<Tag>)> for AnonymousStreamProducer
{
fn from(value: (StreamProducer<T>, StreamProducer<Tag>)) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<Tag>)> for AnonymousStreamConsumer
{
fn from(value: (StreamConsumer<T>, StreamConsumer<Tag>)) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl AnonymousStreamProducer
{
pub fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<Tag>)
{
(
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
self.inner_tag,
)
}
}
impl AnonymousStreamConsumer
{
pub fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<Tag>)
{
(
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
self.inner_tag,
)
}
}

View File

@ -17,7 +17,51 @@ pub struct Tag
// TODO: Make it such that when a tag is duplicated, the data seems to be too:
// When adding on a duplicate, it should not replicate on others, but without
// requiring a deep copy.
pub data: Arc<Mutex<HashMap<String, Box<dyn Any>>>>,
pub data: Arc<Mutex<HashMap<String, Arc<dyn Any + Send + Sync>>>>,
}
pub trait TagValue: Clone {}
impl<T> TagValue for T where T: Clone {}
pub trait TagMergable<T>
{
fn merge(&self, other: &T) -> Self;
}
impl TagMergable<Tag> for Tag
{
fn merge(&self, other: &Self) -> Self
{
// TODO: More performant merge
let mut new = other.clone();
new.position = self.position;
{
let mut data_locked = new.data.lock().unwrap();
for (k, v) in self.data.lock().unwrap().iter()
{
data_locked.insert(k.clone(), v.clone());
}
}
new
}
}
impl TagMergable<Option<Tag>> for Option<Tag>
{
fn merge(&self, other: &Self) -> Self
{
match self
{
Some(first) => match other
{
Some(other) => Some(first.merge(other)),
None => Some(first.clone()),
},
None => other.clone(),
}
}
}
/// Represents a data, with a potential tag attached to it.