use std::any::Any; use std::collections::binary_heap::Iter; use std::sync::Arc; use std::sync::Mutex; use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl; use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple; use crate::stream; use crate::stream::StreamConsumer; use crate::stream::StreamProducer; use crate::stream::StreamReader; use crate::stream::StreamWriter; #[derive(Default)] pub struct Edge { // Represents the index of the block owning the Out end in the graph // And the the output index within that block pub from: Option, // Represents the index of the block owning the In end in the graph // And the the input index within that block pub to: Option, } #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub struct BlockIOIndex { pub block_index: usize, pub port_index: usize, } // Needed for graph to be able to manipulate // stream endings without knowing the generic type pub struct AnonymousStreamProducer { inner: Box, } pub struct AnonymousStreamConsumer { inner: Box, } impl From> for AnonymousStreamProducer { fn from(value: StreamProducer) -> Self { AnonymousStreamProducer { inner: Box::new(value), } } } impl From> for AnonymousStreamConsumer { fn from(value: StreamConsumer) -> Self { AnonymousStreamConsumer { inner: Box::new(value), } } } impl AnonymousStreamProducer { pub fn downcast(self) -> StreamProducer { *self.inner.downcast::>().unwrap() } } impl AnonymousStreamConsumer { pub fn downcast(self) -> StreamConsumer { *self.inner.downcast::>().unwrap() } } pub struct In { stream: Option>, // Will rarely be accessed edge: Arc>, } pub struct Out { stream: Option>, // Will rarely be accessed edge: Arc>, } pub fn stream() -> (Out, In) { let edge = Arc::new(Mutex::new(Edge::default())); ( Out { stream: None, edge: edge.clone(), }, In { stream: None, edge }, ) } impl In { pub fn set_block_index(&self, index: BlockIOIndex) { self.edge.lock().unwrap().to = Some(index); } pub fn get_producer_block(&self) -> Option { self.edge.lock().unwrap().from } pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) { self.stream = Some(consumer.downcast::()) } pub fn read<'a>(&'a mut self) -> StreamReader<'a, T> { self.stream.as_mut().unwrap().read() } } impl Out { pub fn set_block_index(&self, index: BlockIOIndex) { self.edge.lock().unwrap().from = Some(index); } pub fn get_consumer_block(&self) -> Option { self.edge.lock().unwrap().to } pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) { self.stream = Some(producer.downcast::()) } // Delegate stream creation to Out object // which knows the stream type pub fn create_anonymous_stream( &self, capacity: usize, ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) { let (tx, rx) = stream::bounded_queue::(capacity); (tx.into(), rx.into()) } pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T> { self.stream.as_mut().unwrap().write() } pub fn push_iter>(&mut self, mut iter: I) -> bool { let writer = self.write(); let len = writer.len(); for _ in 0..len { if let Some(elt) = iter.next() { let _ = writer.push(elt); } else { return false; } } true } // Meta information pub fn get_type_name(&self) -> &'static str { std::any::type_name::() } } pub struct PopIter { len: usize, popped: usize, reader: T, } pub trait PopIterable<'a> { type Output; fn pop_iter(&'a mut self) -> PopIter; } impl<'a, T: 'static> PopIterable<'a> for In { type Output = StreamReader<'a, T>; fn pop_iter(&'a mut self) -> PopIter> { let reader = self.read(); PopIter { popped: 0, len: reader.len(), reader, } } } generate_pop_iterable_tuple_impl! {2} generate_pop_iterable_tuple_impl! {3} generate_pop_iterable_tuple_impl! {4} generate_pop_iterable_tuple_impl! {5} generate_pop_iterable_tuple_impl! {6} generate_pop_iterable_tuple_impl! {7} generate_pop_iterable_tuple_impl! {8} generate_pop_iterable_tuple_impl! {9} generate_pop_iterable_tuple_impl! {10} generate_pop_iterable_tuple_impl! {11} generate_pop_iterable_tuple_impl! {12} generate_pop_iterable_tuple_impl! {13} generate_pop_iterable_tuple_impl! {14} generate_pop_iterable_tuple_impl! {15} generate_pop_iterable_tuple_impl! {16} generate_pop_iterable_tuple_impl! {17} generate_pop_iterable_tuple_impl! {18} generate_pop_iterable_tuple_impl! {19} generate_pop_iterable_tuple_impl! {20} impl<'a, T> Iterator for PopIter> { type Item = T; fn next(&mut self) -> Option { self.reader.pop() } } impl_iterator_for_pop_iter_tuple! {2} impl_iterator_for_pop_iter_tuple! {3} impl_iterator_for_pop_iter_tuple! {4} impl_iterator_for_pop_iter_tuple! {5} impl_iterator_for_pop_iter_tuple! {6} impl_iterator_for_pop_iter_tuple! {7} impl_iterator_for_pop_iter_tuple! {8} impl_iterator_for_pop_iter_tuple! {9} impl_iterator_for_pop_iter_tuple! {10} impl_iterator_for_pop_iter_tuple! {11} impl_iterator_for_pop_iter_tuple! {12} impl_iterator_for_pop_iter_tuple! {13} impl_iterator_for_pop_iter_tuple! {14} impl_iterator_for_pop_iter_tuple! {15} impl_iterator_for_pop_iter_tuple! {16} impl_iterator_for_pop_iter_tuple! {17} impl_iterator_for_pop_iter_tuple! {18} impl_iterator_for_pop_iter_tuple! {19} impl_iterator_for_pop_iter_tuple! {20}