From 81cac2f239a0faaec4fae77b3bb3aebb8172185c Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Thu, 9 Apr 2026 20:30:05 +0200 Subject: [PATCH] Starts stream rework --- oxydsp-flowgraph/src/io.rs | 358 +++++++++--------- oxydsp-flowgraph/src/stream.rs | 663 +++++++++------------------------ 2 files changed, 367 insertions(+), 654 deletions(-) diff --git a/oxydsp-flowgraph/src/io.rs b/oxydsp-flowgraph/src/io.rs index b3646a9..e6f6eeb 100644 --- a/oxydsp-flowgraph/src/io.rs +++ b/oxydsp-flowgraph/src/io.rs @@ -1,14 +1,10 @@ use std::any::Any; +use std::mem::MaybeUninit; use std::sync::Arc; use std::sync::Mutex; -use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl; -use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple; - use crate::stream::StreamConsumer; use crate::stream::StreamProducer; -use crate::stream::StreamReader; -use crate::stream::StreamWriter; use crate::stream::{self}; use crate::tag::TagSlot; use crate::tag::Tagged; @@ -130,15 +126,19 @@ impl AnonymousOut for Out /// A Reader to get data from an input pub struct InReader<'a, T> { - data_reader: StreamReader<'a, T>, - tag_reader: StreamReader<'a, TagSlot>, + data_slice_1: &'a mut [MaybeUninit], + data_slice_2: &'a mut [MaybeUninit], + // data_reader: StreamReader<'a, T>, + // tag_reader: StreamReader<'a, TagSlot>, } /// A writer to send data to an output pub struct OutWriter<'a, T> { - data_writer: StreamWriter<'a, T>, - tag_writer: StreamWriter<'a, TagSlot>, + data_slice_1: &'a mut [MaybeUninit], + data_slice_2: &'a mut [MaybeUninit], + // data_writer: StreamWriter<'a, T>, + // tag_writer: StreamWriter<'a, TagSlot>, } /// Creates a stream that can then be used to link blocks @@ -176,13 +176,13 @@ impl In /// let reader = input.read(); /// let data = reader.pop(); /// ``` - pub fn read<'a>(&'a mut self) -> InReader<'a, T> + pub fn read<'a>(&'a mut self) -> InReader { - let data_reader = self.stream.as_mut().unwrap().read(); - let tag_reader = self.tag_stream.as_mut().unwrap().read(); + // let data_reader = self.stream.as_mut().unwrap().read(); + // let tag_reader = self.tag_stream.as_mut().unwrap().read(); InReader { - data_reader, - tag_reader, + // data_reader, + // tag_reader, } } } @@ -195,11 +195,11 @@ impl Out /// let writer = output.write(); /// writer.push((data, tag).into()); /// ``` - pub fn write<'a>(&'a mut self) -> OutWriter<'a, T> + pub fn write<'a>(&'a mut self) -> OutWriter { OutWriter { - data_writer: self.stream.as_mut().unwrap().write(), - tag_writer: self.tag_stream.as_mut().unwrap().write(), + // data_writer: self.stream.as_mut().unwrap().write(), + // tag_writer: self.tag_stream.as_mut().unwrap().write(), } } @@ -216,21 +216,22 @@ impl Out /// ``` pub fn push_iter>>(&mut self, mut iter: I) -> bool { - let writer = self.write(); - let len = writer.len(); - - for _ in 0..len - { - if let Some(elt) = iter.next() - { - let _ = writer.push(elt); - } - else - { - return false; - } - } - true + false + // let writer = self.write(); + // let len = writer.len(); + // + // for _ in 0..len + // { + // if let Some(elt) = iter.next() + // { + // let _ = writer.push(elt); + // } + // else + // { + // return false; + // } + // } + // true } /// Meta information @@ -241,99 +242,112 @@ impl Out } } -impl InReader<'_, T> -{ - /// Gets the amount of elements that are available - /// on the input. - pub fn len(&self) -> usize - { - self.data_reader.len() - } +// impl InReader +// { +// /// Gets the amount of elements that are available +// /// on the input. +// pub fn len(&self) -> usize +// { +// 0 +// //self.data_reader.len() +// } +// +// /// Returns true iif no elements are available on the input. +// pub fn is_empty(&self) -> bool +// { +// //self.len() == 0 +// true +// } +// +// /// Pops an element from the input. +// /// It is guaranteed to return `Some(data)` if +// /// if pop was called strictly less times than len +// pub fn pop(&self) -> Option> +// { +// None +// // let data = self.data_reader.pop_with_index(); +// // if let Some((data, index)) = data +// // { +// // let mut tag = None; +// // if self +// // .tag_reader +// // .peek(|t| t.position) +// // .is_some_and(|x| x == index) +// // { +// // tag = self.tag_reader.pop(); +// // } +// // Some((data, tag.map(|t| t.tag)).into()) +// // } +// // else +// // { +// // None +// // } +// } +// +// /// Pops an element from the input, discarding the tag. +// /// It is guaranteed to return `Some(data)` if +// /// if pop was called strictly less times than len +// pub fn pop_untag(&self) -> Option +// { +// None +// // self.pop().map(|data| data.into_inner()) +// } +// +// } +// +// impl OutWriter<'_, T> +// { +// /// Gets how much room is available on the output +// pub fn len(&self) -> usize +// { +// 0 +// //self.data_writer.len().min(self.tag_writer.len()) +// } +// +// /// Returns true iif no element can be sent +// pub fn is_empty(&self) -> bool +// { +// true +// //self.len() == 0 +// } +// +// /// Pushes some tagged data on the input. +// /// +// /// The operation succeeds (`Ok(())`) if there is enough room +// /// Or fails returning the given data to the caller. +// pub fn push(&self, data: Tagged) -> Result<(), Tagged> +// { +// Ok(()) +// // let (data, tag) = data.into(); +// // let position = self.data_writer.next_index(); +// // let tag = tag.map(|t| TagSlot { position, tag: t }); +// // +// // match self.data_writer.push(data) +// // { +// // Ok(_) if tag.is_some() => +// // { +// // let _ = self.tag_writer.push(tag.unwrap()); +// // Ok(()) +// // } +// // Ok(_) => Ok(()), +// // Err(data) => Err((data, tag.map(|t| t.tag)).into()), +// // } +// } +// +// /// Pushes some data on the input (not tagged). +// /// +// /// The operation succeeds (`Ok(())`) if there is enough room +// /// Or fails returning the given data to the caller. +// pub fn push_no_tag(&self, data: T) -> Result<(), T> +// { +// Ok(()) +// //self.data_writer.push(data) +// } +// } - /// Returns true iif no elements are available on the input. - pub fn is_empty(&self) -> bool - { - self.len() == 0 - } - - /// Pops an element from the input. - /// It is guaranteed to return `Some(data)` if - /// if pop was called strictly less times than len - pub fn pop(&self) -> Option> - { - let data = self.data_reader.pop_with_index(); - if let Some((data, index)) = data - { - let mut tag = None; - if self - .tag_reader - .peek(|t| t.position) - .is_some_and(|x| x == index) - { - tag = self.tag_reader.pop(); - } - Some((data, tag.map(|t| t.tag)).into()) - } - else - { - None - } - } - - /// Pops an element from the input, discarding the tag. - /// It is guaranteed to return `Some(data)` if - /// if pop was called strictly less times than len - pub fn pop_untag(&self) -> Option - { - self.pop().map(|data| data.into_inner()) - } -} - -impl OutWriter<'_, T> -{ - /// Gets how much room is available on the output - pub fn len(&self) -> usize - { - self.data_writer.len().min(self.tag_writer.len()) - } - - /// Returns true iif no element can be sent - pub fn is_empty(&self) -> bool - { - self.len() == 0 - } - - /// Pushes some tagged data on the input. - /// - /// The operation succeeds (`Ok(())`) if there is enough room - /// Or fails returning the given data to the caller. - pub fn push(&self, data: Tagged) -> Result<(), Tagged> - { - let (data, tag) = data.into(); - let position = self.data_writer.next_index(); - let tag = tag.map(|t| TagSlot { position, tag: t }); - - match self.data_writer.push(data) - { - Ok(_) if tag.is_some() => - { - let _ = self.tag_writer.push(tag.unwrap()); - Ok(()) - } - Ok(_) => Ok(()), - Err(data) => Err((data, tag.map(|t| t.tag)).into()), - } - } - - /// Pushes some data on the input (not tagged). - /// - /// The operation succeeds (`Ok(())`) if there is enough room - /// Or fails returning the given data to the caller. - pub fn push_no_tag(&self, data: T) -> Result<(), T> - { - self.data_writer.push(data) - } -} +// -------------------- +// Iterator facilites +// -------------------- /// An iterator type to push data to output(s) pub struct PopIter @@ -356,55 +370,55 @@ pub trait PopIterable<'a> fn pop_iter(&'a mut self) -> PopIter; } -impl<'a, T: 'static> PopIterable<'a> for In -{ - type Output = InReader<'a, T>; - fn pop_iter(&'a mut self) -> PopIter> - { - let reader = self.read(); - PopIter { - popped: 0, - len: reader.len(), - reader, - } - } -} +// impl<'a, T: 'static> PopIterable<'a> for In +// { +// type Output = InReader<'a, T>; +// fn pop_iter(&'a mut self) -> PopIter> +// { +// let reader = self.read(); +// PopIter { +// popped: 0, +// len: reader.len(), +// reader, +// } +// } +// } +// +// generate_pop_iterable_tuple_impl! {1} +// generate_pop_iterable_tuple_impl! {2} +// generate_pop_iterable_tuple_impl! {3} +// generate_pop_iterable_tuple_impl! {4} +// generate_pop_iterable_tuple_impl! {5} +// generate_pop_iterable_tuple_impl! {6} +// generate_pop_iterable_tuple_impl! {7} +// generate_pop_iterable_tuple_impl! {8} +// generate_pop_iterable_tuple_impl! {9} +// generate_pop_iterable_tuple_impl! {10} +// generate_pop_iterable_tuple_impl! {11} +// generate_pop_iterable_tuple_impl! {12} +// +// impl<'a, T> Iterator for PopIter> +// { +// type Item = Tagged; +// +// fn next(&mut self) -> Option +// { +// self.reader.pop() +// } +// } -generate_pop_iterable_tuple_impl! {1} -generate_pop_iterable_tuple_impl! {2} -generate_pop_iterable_tuple_impl! {3} -generate_pop_iterable_tuple_impl! {4} -generate_pop_iterable_tuple_impl! {5} -generate_pop_iterable_tuple_impl! {6} -generate_pop_iterable_tuple_impl! {7} -generate_pop_iterable_tuple_impl! {8} -generate_pop_iterable_tuple_impl! {9} -generate_pop_iterable_tuple_impl! {10} -generate_pop_iterable_tuple_impl! {11} -generate_pop_iterable_tuple_impl! {12} - -impl<'a, T> Iterator for PopIter> -{ - type Item = Tagged; - - fn next(&mut self) -> Option - { - self.reader.pop() - } -} - -impl_iterator_for_pop_iter_tuple! {1} -impl_iterator_for_pop_iter_tuple! {2} -impl_iterator_for_pop_iter_tuple! {3} -impl_iterator_for_pop_iter_tuple! {4} -impl_iterator_for_pop_iter_tuple! {5} -impl_iterator_for_pop_iter_tuple! {6} -impl_iterator_for_pop_iter_tuple! {7} -impl_iterator_for_pop_iter_tuple! {8} -impl_iterator_for_pop_iter_tuple! {9} -impl_iterator_for_pop_iter_tuple! {10} -impl_iterator_for_pop_iter_tuple! {11} -impl_iterator_for_pop_iter_tuple! {12} +// impl_iterator_for_pop_iter_tuple! {1} +// impl_iterator_for_pop_iter_tuple! {2} +// impl_iterator_for_pop_iter_tuple! {3} +// impl_iterator_for_pop_iter_tuple! {4} +// impl_iterator_for_pop_iter_tuple! {5} +// impl_iterator_for_pop_iter_tuple! {6} +// impl_iterator_for_pop_iter_tuple! {7} +// impl_iterator_for_pop_iter_tuple! {8} +// impl_iterator_for_pop_iter_tuple! {9} +// impl_iterator_for_pop_iter_tuple! {10} +// impl_iterator_for_pop_iter_tuple! {11} +// impl_iterator_for_pop_iter_tuple! {12} /// StreamProducer object for data and tags stored in a type /// agnostic/erased way. diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index b9acf27..c96be31 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -1,4 +1,3 @@ -use std::cell::Cell; use std::cell::UnsafeCell; use std::mem::MaybeUninit; use std::ops::Deref; @@ -53,37 +52,6 @@ unsafe impl Sync for StreamProducer {} unsafe impl Send for StreamConsumer {} unsafe impl Sync for StreamConsumer {} -// Represents a write operation within a stream producer -pub struct StreamWriter<'a, T> -{ - producer: &'a mut StreamProducer, - first: &'a UnsafeCell<[MaybeUninit]>, - second: Option<&'a UnsafeCell<[MaybeUninit]>>, - first_len: usize, - second_len: usize, - written: Cell, - - // Index of the first element to be pushed - // within the "infinite buffer" - // Used to number tags - start_index: usize, -} - -// Represents a read operation within a stream producer -pub struct StreamReader<'a, T> -{ - producer: &'a StreamConsumer, - first: &'a UnsafeCell<[MaybeUninit]>, - second: Option<&'a UnsafeCell<[MaybeUninit]>>, - first_len: usize, - second_len: usize, - read: Cell, - - // Index of the first element to be read - // within the "infinite buffer" - // Used to number tags - start_index: usize, -} pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer) { @@ -123,7 +91,22 @@ pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer StreamProducer { - pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T> + pub fn produce(&mut self, written: usize) + { + // Advance head. + let head = self.inner.head.load(Ordering::Relaxed); + let tail = self.inner.tail.load(Ordering::Relaxed); + + // Check bounds + assert!(head + written - tail <= (self.inner.capacity_mask + 1)); + + // We want writes to the buffer to be visible when acquired in the pop side + self.inner + .head + .store(head + written, Ordering::Release); + } + + pub fn write(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { // We need to claim the maximum amount of elements. let tail = self.inner.tail.load(Ordering::Acquire); @@ -159,34 +142,9 @@ impl StreamProducer let (start_to_tail, _tail_to_head) = start_to_head.split_at_mut_unchecked(wrapped_tail); - // Slices are wrapped into unsafe cells to provide interior mutability - // On the stream as it is much more convienient. - // - // SAFETY: - // // This functions borrows the stream mutably. As such, only one instance - // of StreamWriter can exist for a given stream. The StreamWriter - // is thus the only on able to write or read the stream when it lives - let first_len = head_to_end.len(); - let second_len = start_to_tail.len(); - let first = std::mem::transmute::< - &mut [MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(head_to_end); - let second = Some(std::mem::transmute::< - &mut [MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(start_to_tail)); - StreamWriter { - start_index: head, - - producer: self, - first, - second, - first_len, - second_len, - written: 0.into(), - } + // of these slices can exist for a given stream. + (head_to_end, start_to_tail) } } else @@ -195,12 +153,6 @@ impl StreamProducer if wrapped_tail < wrapped_head { - // - // Or - // ▯▯▯▯▯▯▯▯▯▯▯▯▯ - // | - // tail & head - // (empty) // Current configuration : // ▯▯▯▮▮▮▮▮▮▯▯▯▯ // | | @@ -223,28 +175,7 @@ impl StreamProducer let (start_to_tail, _tail_to_head) = start_to_head.split_at_mut_unchecked(wrapped_tail); - let first_len = head_to_end.len(); - let second_len = start_to_tail.len(); - - let first = std::mem::transmute::< - &mut [MaybeUninit], - &UnsafeCell<[std::mem::MaybeUninit]>, - >(head_to_end); - let second = Some(std::mem::transmute::< - &mut [MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(start_to_tail)); - - StreamWriter { - start_index: head, - - producer: self, - first, - second, - first_len, - second_len, - written: 0.into(), - } + (head_to_end, start_to_tail) } } else @@ -261,8 +192,8 @@ impl StreamProducer // | // tail & head // (full) - // ______.______ - // slice2 slice1 + // . + // slice1 // SAFETY: // @@ -272,20 +203,10 @@ impl StreamProducer // Head and tail are both indices of the slice unsafe { let k = &mut *self.inner.buffer.get(); - let len = wrapped_tail - wrapped_head; - StreamWriter { - start_index: head, - - producer: self, - first_len: len, - second_len: 0, - first: std::mem::transmute::< - &[MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(&k[wrapped_head..wrapped_tail]), - second: None, - written: 0.into(), - } + let (_start_to_head, head_to_tail) = k.split_at_mut_unchecked(wrapped_head); + let (head_to_tail, empty_slice) = + head_to_tail.split_at_mut_unchecked(wrapped_tail - wrapped_head); + (head_to_tail, empty_slice) } } } @@ -294,7 +215,22 @@ impl StreamProducer impl StreamConsumer { - pub fn read<'a>(&'a mut self) -> StreamReader<'a, T> + pub fn consume(&mut self, read: usize) + { + // Advance head. + let head = self.inner.head.load(Ordering::Relaxed); + let tail = self.inner.tail.load(Ordering::Relaxed); + + // Check bounds + assert!(tail + read <= head); + + // We want writes to the buffer to be visible when acquired in the pop side + self.inner + .tail + .store(tail + read, Ordering::Release); + } + + pub fn read_uninit(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) { // We need to claim the maximum amount of elements. let head = self.inner.head.load(Ordering::Acquire); @@ -311,19 +247,9 @@ impl StreamConsumer // Buffer is empty. Return empty slice unsafe { let k = &mut *self.inner.buffer.get(); - let len = wrapped_head - wrapped_tail; - StreamReader { - start_index: tail, - - producer: self, - first_len: len, - second_len: 0, - first: std::mem::transmute::<&[MaybeUninit], &UnsafeCell<[MaybeUninit]>>( - &k[wrapped_tail..wrapped_head], - ), - second: None, - read: 0.into(), - } + let head_to_tail = &mut k[wrapped_head..wrapped_tail]; + let (empty_1, empty_2) = head_to_tail.split_at_mut_unchecked(0); + (empty_1, empty_2) } } else @@ -346,21 +272,10 @@ impl StreamConsumer // // Head and tail are both indices of the slice unsafe { - let k = &mut *self.inner.buffer.get(); - let len = wrapped_head - wrapped_tail; - StreamReader { - start_index: tail, - - producer: self, - first_len: len, - second_len: 0, - first: std::mem::transmute::< - &[MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(&k[wrapped_tail..wrapped_head]), - second: None, - read: 0.into(), - } + let k = &mut (&mut *self.inner.buffer.get())[wrapped_tail..wrapped_head]; + let (tail_to_head, empty_slice) = + k.split_at_mut_unchecked(wrapped_head - wrapped_tail); + (tail_to_head, empty_slice) } } else @@ -395,339 +310,114 @@ impl StreamConsumer let (start_to_head, _head_to_tail) = start_to_tail.split_at_mut_unchecked(wrapped_head); - let first_len = tail_to_end.len(); - let second_len = start_to_head.len(); - - let first = std::mem::transmute::< - &mut [MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(tail_to_end); - let second = Some(std::mem::transmute::< - &mut [MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(start_to_head)); - - StreamReader { - start_index: tail, - - producer: self, - first, - second, - first_len, - second_len, - read: 0.into(), - } + (tail_to_end, start_to_head) } } } } - /// Creates a reader of contiguous elements that - /// satisfy the predicate - pub fn read_while(&mut self, predicate: F) -> StreamReader<'_, T> - where - F: Fn(&T) -> bool, - { - // Take a normal reader. This contains available elements to read. - let mut reader = self.read(); - - // We need to trim the slices to keep only the satified elements - - // First slice - let mut first_kept = 0; - // SAFETY: - // - // Only us can have a reference to these slices of the buffer - for element in unsafe { &*reader.first.get() } - { - // SAFETY - // - // If this element is in a reader returned by self.read - // with no pop called, we know it is initialized - let init_element = unsafe { element.assume_init_ref() }; - let sat = predicate(init_element); - if !sat - { - // Stop here - // Forget about second slice - reader.second_len = 0; - reader.second = None; - - // Trim first slice - reader.first_len = first_kept; - unsafe { - reader.first = std::mem::transmute::< - &[MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >(&(&*reader.first.get())[0..first_kept]); - } - - return reader; - } - first_kept += 1; - } - - // If we are here, all of the elements of the first slice, satisfy the predicate - - if let Some(second_slice) = &mut reader.second - { - // Second slice - let mut second_kept = 0; - // SAFETY: - // - // Only us can have a reference to these slices of the buffer - for element in unsafe { &*second_slice.get() } - { - // SAFETY - // - // If this element is in a reader returned by self.read - // with no pop called, we know it is initialized - let init_element = unsafe { element.assume_init_ref() }; - let sat = predicate(init_element); - if !sat - { - // Stop here - // Trim second slice - reader.second_len = second_kept; - unsafe { - reader.second = Some(std::mem::transmute::< - &[MaybeUninit], - &UnsafeCell<[MaybeUninit]>, - >( - &(&*second_slice.get())[0..first_kept] - )); - } - return reader; - } - second_kept += 1; - } - } - - return reader; - } + // Creates a reader of contiguous elements that + // satisfy the predicate + // pub fn read_while(&mut self, predicate: F) -> StreamReader<'_, T> + // where + // F: Fn(&T) -> bool, + // { + // // Take a normal reader. This contains available elements to read. + // let mut reader = self.read(); + // + // // We need to trim the slices to keep only the satified elements + // + // // First slice + // let mut first_kept = 0; + // // SAFETY: + // // + // // Only us can have a reference to these slices of the buffer + // for element in unsafe { &*reader.first.get() } + // { + // // SAFETY + // // + // // If this element is in a reader returned by self.read + // // with no pop called, we know it is initialized + // let init_element = unsafe { element.assume_init_ref() }; + // let sat = predicate(init_element); + // if !sat + // { + // // Stop here + // // Forget about second slice + // reader.second_len = 0; + // reader.second = None; + // + // // Trim first slice + // reader.first_len = first_kept; + // unsafe { + // reader.first = std::mem::transmute::< + // &[MaybeUninit], + // &UnsafeCell<[MaybeUninit]>, + // >(&(&*reader.first.get())[0..first_kept]); + // } + // + // return reader; + // } + // first_kept += 1; + // } + // + // // If we are here, all of the elements of the first slice, satisfy the predicate + // + // if let Some(second_slice) = &mut reader.second + // { + // // Second slice + // let mut second_kept = 0; + // // SAFETY: + // // + // // Only us can have a reference to these slices of the buffer + // for element in unsafe { &*second_slice.get() } + // { + // // SAFETY + // // + // // If this element is in a reader returned by self.read + // // with no pop called, we know it is initialized + // let init_element = unsafe { element.assume_init_ref() }; + // let sat = predicate(init_element); + // if !sat + // { + // // Stop here + // // Trim second slice + // reader.second_len = second_kept; + // unsafe { + // reader.second = Some(std::mem::transmute::< + // &[MaybeUninit], + // &UnsafeCell<[MaybeUninit]>, + // >( + // &(&*second_slice.get())[0..first_kept] + // )); + // } + // return reader; + // } + // second_kept += 1; + // } + // } + // + // return reader; + // } } -impl<'a, T> StreamWriter<'a, T> -{ - pub fn len(&self) -> usize +impl StreamConsumer +{ + pub fn read(&mut self) -> (&[T], &[T]) { - self.first_len + self.second_len - } - - pub fn is_empty(&self) -> bool - { - self.len() == 0 - } - - pub fn next_index(&self) -> usize - { - self.start_index + self.written.get() - } - - pub fn push(&self, element: T) -> Result<(), T> - { - if self.written.get() < self.first_len + let (slice_1, slice_2) = self.read_uninit(); + unsafe { - unsafe { - (&mut *self.first.get())[self.written.get()] = MaybeUninit::new(element); - } - self.written.set(self.written.get() + 1); - Ok(()) + (std::mem::transmute(slice_1), std::mem::transmute(slice_2)) } - else if let Some(second) = &self.second - && self.written.get() - self.first_len < self.second_len - { - unsafe { - (&mut *second.get())[self.written.get() - self.first_len] = - MaybeUninit::new(element); - } - self.written.set(self.written.get() + 1); - Ok(()) - } - else - { - Err(element) - } - } - pub fn write(&self, length: usize) - { - let new = self.written.get() + length; - assert!(new < self.len()); - self.written.set(new); - } -} - -impl<'a, T: Copy> StreamWriter<'a, T> -{ - pub fn slices_mut(&mut self) -> (&mut [MaybeUninit], &mut [MaybeUninit]) - { - unsafe { - ( - &mut *self.first.get(), - self.second - .map(|x| &mut *x.get()) - .unwrap_or_else(|| &mut(&mut *self.first.get())[0..0]), - ) - } - } -} - -impl<'a, T> StreamReader<'a, T> -{ - pub fn len(&self) -> usize - { - self.first_len + self.second_len - } - - pub fn is_empty(&self) -> bool - { - self.len() == 0 - } - - pub fn last_index(&self) -> usize - { - self.start_index + self.len() - } - - pub fn next_index(&self) -> usize - { - self.start_index + self.read.get() - } - - pub fn pop_with_index(&self) -> Option<(T, usize)> - { - let index = self.next_index(); - self.pop().map(|t| (t, index)) - } - - pub fn peek(&self, peeker: F) -> Option - where - F: Fn(&T) -> O, - { - // Same as pop, without taking, or increasing read count - if self.read.get() < self.first_len - { - // SAFETY: - // - // If element is in this slice, it is initialized. - // We take it once since read increases - let element = unsafe { (&mut *self.first.get())[self.read.get()].assume_init_ref() }; - Some(peeker(element)) - } - else if let Some(second) = &self.second - && self.read.get() - self.first_len < self.second_len - { - let element = - unsafe { (&mut *second.get())[self.read.get() - self.first_len].assume_init_ref() }; - Some(peeker(element)) - } - else - { - None - } - } - - pub fn pop(&self) -> Option - { - if self.read.get() < self.first_len - { - // SAFETY: - // - // If element is in this slice, it is initialized. - // We take it once since read increases - let element = unsafe { - std::mem::replace( - &mut (&mut *self.first.get())[self.read.get()], - MaybeUninit::uninit(), - ) - .assume_init() - }; - self.read.set(self.read.get() + 1); - Some(element) - } - else if let Some(second) = &self.second - && self.read.get() - self.first_len < self.second_len - { - let element = unsafe { - std::mem::replace( - &mut (&mut *second.get())[self.read.get() - self.first_len], - MaybeUninit::uninit(), - ) - .assume_init() - }; - self.read.set(self.read.get() + 1); - Some(element) - } - else - { - None - } - } - - pub fn read(&self, length: usize) - { - let new = self.read.get() + length; - assert!(new < self.len()); - self.read.set(new); - } -} - -impl<'a, T: Copy> StreamReader<'a, T> -{ - pub fn slices(&self) -> (&[T], &[T]) - { - unsafe { - ( - std::mem::transmute::<&[MaybeUninit], &[T]>(&*self.first.get()), - std::mem::transmute::<&[MaybeUninit], &[T]>( - self.second - .map(|x| &*x.get()) - .unwrap_or_else(|| &(&*self.first.get())[0..0]), - ), - ) - } - } -} - -// When a Stream writer goes out of scope, it wrote -// some things into the stream. These things need to be commited to the queue -impl<'a, T> Drop for StreamWriter<'a, T> -{ - fn drop(&mut self) - { - // Advance head. - // We know that this value hasn't changed since this StreamWriter was created - // let head = self.producer.inner.head.load(Ordering::Relaxed); - - // We want writes to the buffer to be visible when acquired in the pop side - self.producer - .inner - .head - .store(self.start_index + self.written.get(), Ordering::Release); - } -} - -// When a Stream reader goes out of scope, it took -// some things from the stream. These things need to be de-commited to the queue -impl<'a, T> Drop for StreamReader<'a, T> -{ - fn drop(&mut self) - { - // Advance tail. - // We know that this value hasn't changed since this StreamWriter was created - // let tail = self.producer.inner.tail.load(Ordering::Relaxed); - - // We want writes to the buffer to be visible when acquired in the push side - self.producer - .inner - .tail - .store(self.start_index + self.read.get(), Ordering::Release); } } mod test { + use std::mem::MaybeUninit; + #[allow(unused_imports)] use crate::stream::bounded_queue; @@ -738,57 +428,66 @@ mod test let (mut tx, mut rx) = bounded_queue::(4); { - let writer = tx.write(); + let (a, b) = tx.write(); + assert_eq!(a.len(), 4); + assert_eq!(b.len(), 0); - assert_eq!(writer.len(), 4); + a[0] = MaybeUninit::new(0); + a[1] = MaybeUninit::new(1); + a[2] = MaybeUninit::new(2); + a[3] = MaybeUninit::new(3); - assert_eq!(writer.push(1), Ok(())); - assert_eq!(writer.push(2), Ok(())); - assert_eq!(writer.push(3), Ok(())); - assert_eq!(writer.push(4), Ok(())); - assert_eq!(writer.push(5), Err(5)); + tx.produce(4); } { - let reader = rx.read(); + let (a, b) = rx.read(); + assert_eq!(a.len(), 4); + assert_eq!(b.len(), 0); - assert_eq!(reader.len(), 4); + assert_eq!(a[0], 0); + assert_eq!(a[1], 1); + assert_eq!(a[2], 2); + assert_eq!(a[3], 3); - assert_eq!(reader.pop(), Some(1)); - assert_eq!(reader.pop(), Some(2)); - assert_eq!(reader.pop(), Some(3)); - assert_eq!(reader.pop(), Some(4)); - assert_eq!(reader.pop(), None); + rx.consume(4); } - // Put stream into weird situatino + // Put stream into weird situation { - let writer = tx.write(); - assert_eq!(writer.push(1), Ok(())); - assert_eq!(writer.push(2), Ok(())); - assert_eq!(writer.push(3), Ok(())); - assert_eq!(writer.push(4), Ok(())); + let (a, b) = tx.write(); + assert_eq!(a.len(), 4); + assert_eq!(b.len(), 0); + + a[0] = MaybeUninit::new(0); + a[1] = MaybeUninit::new(1); + a[2] = MaybeUninit::new(2); + + tx.produce(3); } { - let reader = rx.read(); - assert_eq!(reader.pop(), Some(1)); - assert_eq!(reader.pop(), Some(2)); + let (a, b) = rx.read(); + assert_eq!(a.len(), 3); + assert_eq!(b.len(), 0); + + assert_eq!(a[0], 0); + assert_eq!(a[1], 1); + assert_eq!(a[2], 2); + + rx.consume(1); } { - let writer = tx.write(); - assert_eq!(writer.len(), 2); - assert_eq!(writer.push(5), Ok(())); - assert_eq!(writer.push(6), Ok(())); + let (a, b) = tx.write(); + assert_eq!(a.len(), 1); + assert_eq!(b.len(), 1); } { - let reader = rx.read(); - assert_eq!(reader.pop(), Some(3)); - assert_eq!(reader.pop(), Some(4)); - assert_eq!(reader.pop(), Some(5)); - assert_eq!(reader.pop(), Some(6)); + let (a, b) = rx.read(); + assert_eq!(a.len(), 2); + assert_eq!(b.len(), 0); } } }