diff --git a/oxydsp-flowgraph/src/edge.rs b/oxydsp-flowgraph/src/edge.rs index 44144f1..f89f1a1 100644 --- a/oxydsp-flowgraph/src/edge.rs +++ b/oxydsp-flowgraph/src/edge.rs @@ -80,123 +80,6 @@ impl AnonymousStreamConsumer } } -pub struct In -{ - stream: Option>, - tag_stream: Option>, - - // Will rarely be accessed - edge: Arc>, -} - -pub struct Out -{ - stream: Option>, - tag_stream: Option>, - - // Will rarely be accessed - edge: Arc>, -} - -pub fn stream() -> (Out, In) -{ - let edge = Arc::new(Mutex::new(Edge::default())); - ( - Out { - stream: None, - tag_stream: None, - edge: edge.clone(), - }, - In { - stream: None, - tag_stream: None, - edge, - }, - ) -} - -impl In -{ - pub fn set_block_index(&self, index: BlockIOIndex) - { - self.edge.lock().unwrap().to = Some(index); - } - - pub fn get_producer_block(&self) -> Option - { - self.edge.lock().unwrap().from - } - - pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer) - { - self.stream = Some(consumer.downcast::()) - } - - pub fn read<'a>(&'a mut self) -> StreamReader<'a, T> - { - self.stream.as_mut().unwrap().read() - } -} - -impl Out -{ - pub fn set_block_index(&self, index: BlockIOIndex) - { - self.edge.lock().unwrap().from = Some(index); - } - - pub fn get_consumer_block(&self) -> Option - { - self.edge.lock().unwrap().to - } - - pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer) - { - self.stream = Some(producer.downcast::()) - } - - // Delegate stream creation to Out object - // which knows the stream type - pub fn create_anonymous_stream( - &self, - capacity: usize, - ) -> (AnonymousStreamProducer, AnonymousStreamConsumer) - { - let (tx, rx) = stream::bounded_queue::(capacity); - (tx.into(), rx.into()) - } - - pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T> - { - self.stream.as_mut().unwrap().write() - } - - pub fn push_iter>(&mut self, mut iter: I) -> bool - { - let writer = self.write(); - let len = writer.len(); - - for _ in 0..len - { - if let Some(elt) = iter.next() - { - let _ = writer.push(elt); - } - else - { - return false; - } - } - true - } - - // Meta information - pub fn get_type_name(&self) -> &'static str - { - std::any::type_name::() - } -} - pub struct PopIter { len: usize, diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index 68367da..68fd155 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -1,10 +1,16 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use std::sync::Mutex; -use crate::{ - edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex, Edge}, - stream::{self, StreamConsumer, StreamProducer, StreamReader, StreamWriter}, - tag::Tag, -}; +use crate::edge::AnonymousStreamConsumer; +use crate::edge::AnonymousStreamProducer; +use crate::edge::BlockIOIndex; +use crate::edge::Edge; +use crate::stream::StreamConsumer; +use crate::stream::StreamProducer; +use crate::stream::StreamReader; +use crate::stream::StreamWriter; +use crate::stream::{self}; +use crate::tag::Tag; pub struct In { @@ -72,9 +78,11 @@ impl In pub fn read<'a>(&'a mut self) -> InReader<'a, T> { + let data_reader = self.stream.as_mut().unwrap().read(); + let tag_reader = self.tag_stream.as_mut().unwrap().read(); InReader { - data_reader: self.stream.as_mut().unwrap().read(), - tag_reader: self.tag_stream.as_mut().unwrap().read(), + data_reader, + tag_reader, } } } @@ -140,3 +148,37 @@ impl Out std::any::type_name::() } } + +impl InReader<'_, T> +{ + pub fn len(&self) -> usize + { + self.data_reader.len() + } + + pub fn pop_tagged(&self) -> Option<(T, Option)> + { + let data = self.data_reader.pop_with_index(); + if let Some((data, index)) = data + { + let mut tag = None; + if self + .tag_reader + .peek(|t| t.position) + .is_some_and(|x| x == index) + { + tag = self.tag_reader.pop(); + } + Some((data, tag)) + } + else + { + None + } + } + + pub fn pop_drop_tag(&self) -> Option + { + self.pop_tagged().map(|(data, _)| data) + } +} diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index 1f2aedd..489e4b1 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -1,5 +1,6 @@ use std::cell::Cell; use std::cell::UnsafeCell; +use std::io::empty; use std::mem::MaybeUninit; use std::ops::Deref; use std::sync::Arc; @@ -62,6 +63,11 @@ pub struct StreamWriter<'a, T> first_len: usize, second_len: usize, written: Cell, + + // Index of the first element to be pushed + // within the "infinite buffer" + // Used to number tags + start_index: usize, } // Represents a read operation within a stream producer @@ -73,6 +79,11 @@ pub struct StreamReader<'a, T> first_len: usize, second_len: usize, read: Cell, + + // Index of the first element to be read + // within the "infinite buffer" + // Used to number tags + start_index: usize, } pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer) @@ -168,6 +179,8 @@ impl StreamProducer &UnsafeCell<[MaybeUninit]>, >(start_to_tail)); StreamWriter { + start_index: head, + producer: self, first, second, @@ -224,6 +237,8 @@ impl StreamProducer >(start_to_tail)); StreamWriter { + start_index: head, + producer: self, first, second, @@ -258,9 +273,12 @@ impl StreamProducer // Head and tail are both indices of the slice unsafe { let k = &mut *self.inner.buffer.get(); + let len = wrapped_tail - wrapped_head; StreamWriter { + start_index: head, + producer: self, - first_len: wrapped_tail - wrapped_head, + first_len: len, second_len: 0, first: std::mem::transmute::< &[MaybeUninit], @@ -294,9 +312,12 @@ impl StreamConsumer // Buffer is empty. Return empty slice unsafe { let k = &mut *self.inner.buffer.get(); + let len = wrapped_head - wrapped_tail; StreamReader { + start_index: tail, + producer: self, - first_len: wrapped_head - wrapped_tail, + first_len: len, second_len: 0, first: std::mem::transmute::<&[MaybeUninit], &UnsafeCell<[MaybeUninit]>>( &k[wrapped_tail..wrapped_head], @@ -327,9 +348,12 @@ impl StreamConsumer // Head and tail are both indices of the slice unsafe { let k = &mut *self.inner.buffer.get(); + let len = wrapped_head - wrapped_tail; StreamReader { + start_index: tail, + producer: self, - first_len: wrapped_head - wrapped_tail, + first_len: len, second_len: 0, first: std::mem::transmute::< &[MaybeUninit], @@ -385,6 +409,8 @@ impl StreamConsumer >(start_to_head)); StreamReader { + start_index: tail, + producer: self, first, second, @@ -410,7 +436,10 @@ impl StreamConsumer // First slice let mut first_kept = 0; - for element in reader.first.get_mut() + // SAFETY: + // + // Only us can have a reference to these slices of the buffer + for element in unsafe { &*reader.first.get() } { // SAFETY // @@ -425,18 +454,57 @@ impl StreamConsumer reader.second_len = 0; reader.second = None; + // Trim first slice reader.first_len = first_kept; unsafe { - *reader.first = - std::mem::transmute::< - &mut [MaybeUninit], - &mut UnsafeCell<[MaybeUninit]>, - >(&mut (reader.first.get_mut())[0..first_kept]); + reader.first = std::mem::transmute::< + &[MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >(&(&*reader.first.get())[0..first_kept]); } + + return reader; + } + first_kept += 1; + } + + // If we are here, all of the elements of the first slice, satisfy the predicate + + if let Some(second_slice) = &mut reader.second + { + // Second slice + let mut second_kept = 0; + // SAFETY: + // + // Only us can have a reference to these slices of the buffer + for element in unsafe { &*second_slice.get() } + { + // SAFETY + // + // If this element is in a reader returned by self.read + // with no pop called, we know it is initialized + let init_element = unsafe { element.assume_init_ref() }; + let sat = predicate(init_element); + if !sat + { + // Stop here + // Trim second slice + reader.second_len = second_kept; + unsafe { + reader.second = Some(std::mem::transmute::< + &[MaybeUninit], + &UnsafeCell<[MaybeUninit]>, + >( + &(&*second_slice.get())[0..first_kept] + )); + } + return reader; + } + second_kept += 1; } } - todo!() + return reader; } } @@ -452,6 +520,11 @@ impl<'a, T> StreamWriter<'a, T> self.len() == 0 } + pub fn next_index(&self) -> usize + { + self.start_index + self.written.get() + } + pub fn push(&self, element: T) -> Result<(), T> { if self.written.get() < self.first_len @@ -491,6 +564,49 @@ impl<'a, T> StreamReader<'a, T> self.len() == 0 } + pub fn last_index(&self) -> usize + { + self.start_index + self.len() + } + + pub fn next_index(&self) -> usize + { + self.start_index + self.read.get() + } + + pub fn pop_with_index(&self) -> Option<(T, usize)> + { + let index = self.next_index(); + self.pop().map(|t| (t, index)) + } + + pub fn peek(&self, peeker: F) -> Option + where + F: Fn(&T) -> O, + { + // Same as pop, without taking, or increasing read count + if self.read.get() < self.first_len + { + // SAFETY: + // + // If element is in this slice, it is initialized. + // We take it once since read increases + let element = unsafe { (&mut *self.first.get())[self.read.get()].assume_init_ref() }; + Some(peeker(element)) + } + else if let Some(second) = &self.second + && self.read.get() - self.first_len < self.second_len + { + let element = + unsafe { (&mut *second.get())[self.read.get() - self.first_len].assume_init_ref() }; + Some(peeker(element)) + } + else + { + None + } + } + pub fn pop(&self) -> Option { if self.read.get() < self.first_len