Starts stream rework

This commit is contained in:
2026-04-09 20:30:05 +02:00
parent 89ff2827ff
commit 81cac2f239
2 changed files with 367 additions and 654 deletions

View File

@ -1,14 +1,10 @@
use std::any::Any;
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::sync::Mutex;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::stream::{self};
use crate::tag::TagSlot;
use crate::tag::Tagged;
@ -130,15 +126,19 @@ impl<T: 'static> AnonymousOut for Out<T>
/// A Reader to get data from an input
pub struct InReader<'a, T>
{
data_reader: StreamReader<'a, T>,
tag_reader: StreamReader<'a, TagSlot>,
data_slice_1: &'a mut [MaybeUninit<T>],
data_slice_2: &'a mut [MaybeUninit<T>],
// data_reader: StreamReader<'a, T>,
// tag_reader: StreamReader<'a, TagSlot>,
}
/// A writer to send data to an output
pub struct OutWriter<'a, T>
{
data_writer: StreamWriter<'a, T>,
tag_writer: StreamWriter<'a, TagSlot>,
data_slice_1: &'a mut [MaybeUninit<T>],
data_slice_2: &'a mut [MaybeUninit<T>],
// data_writer: StreamWriter<'a, T>,
// tag_writer: StreamWriter<'a, TagSlot>,
}
/// Creates a stream that can then be used to link blocks
@ -176,13 +176,13 @@ impl<T: 'static> In<T>
/// let reader = input.read();
/// let data = reader.pop();
/// ```
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
pub fn read<'a>(&'a mut self) -> InReader
{
let data_reader = self.stream.as_mut().unwrap().read();
let tag_reader = self.tag_stream.as_mut().unwrap().read();
// let data_reader = self.stream.as_mut().unwrap().read();
// let tag_reader = self.tag_stream.as_mut().unwrap().read();
InReader {
data_reader,
tag_reader,
// data_reader,
// tag_reader,
}
}
}
@ -195,11 +195,11 @@ impl<T: 'static> Out<T>
/// let writer = output.write();
/// writer.push((data, tag).into());
/// ```
pub fn write<'a>(&'a mut self) -> OutWriter<'a, T>
pub fn write<'a>(&'a mut self) -> OutWriter
{
OutWriter {
data_writer: self.stream.as_mut().unwrap().write(),
tag_writer: self.tag_stream.as_mut().unwrap().write(),
// data_writer: self.stream.as_mut().unwrap().write(),
// tag_writer: self.tag_stream.as_mut().unwrap().write(),
}
}
@ -216,21 +216,22 @@ impl<T: 'static> Out<T>
/// ```
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
for _ in 0..len
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
}
else
{
return false;
}
}
true
false
// let writer = self.write();
// let len = writer.len();
//
// for _ in 0..len
// {
// if let Some(elt) = iter.next()
// {
// let _ = writer.push(elt);
// }
// else
// {
// return false;
// }
// }
// true
}
/// Meta information
@ -241,99 +242,112 @@ impl<T: 'static> Out<T>
}
}
impl<T> InReader<'_, T>
{
/// Gets the amount of elements that are available
/// on the input.
pub fn len(&self) -> usize
{
self.data_reader.len()
}
// impl InReader
// {
// /// Gets the amount of elements that are available
// /// on the input.
// pub fn len(&self) -> usize
// {
// 0
// //self.data_reader.len()
// }
//
// /// Returns true iif no elements are available on the input.
// pub fn is_empty(&self) -> bool
// {
// //self.len() == 0
// true
// }
//
// /// Pops an element from the input.
// /// It is guaranteed to return `Some(data)` if
// /// if pop was called strictly less times than len
// pub fn pop(&self) -> Option<Tagged<T>>
// {
// None
// // let data = self.data_reader.pop_with_index();
// // if let Some((data, index)) = data
// // {
// // let mut tag = None;
// // if self
// // .tag_reader
// // .peek(|t| t.position)
// // .is_some_and(|x| x == index)
// // {
// // tag = self.tag_reader.pop();
// // }
// // Some((data, tag.map(|t| t.tag)).into())
// // }
// // else
// // {
// // None
// // }
// }
//
// /// Pops an element from the input, discarding the tag.
// /// It is guaranteed to return `Some(data)` if
// /// if pop was called strictly less times than len
// pub fn pop_untag(&self) -> Option<T>
// {
// None
// // self.pop().map(|data| data.into_inner())
// }
//
// }
//
// impl<T> OutWriter<'_, T>
// {
// /// Gets how much room is available on the output
// pub fn len(&self) -> usize
// {
// 0
// //self.data_writer.len().min(self.tag_writer.len())
// }
//
// /// Returns true iif no element can be sent
// pub fn is_empty(&self) -> bool
// {
// true
// //self.len() == 0
// }
//
// /// Pushes some tagged data on the input.
// ///
// /// The operation succeeds (`Ok(())`) if there is enough room
// /// Or fails returning the given data to the caller.
// pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
// {
// Ok(())
// // let (data, tag) = data.into();
// // let position = self.data_writer.next_index();
// // let tag = tag.map(|t| TagSlot { position, tag: t });
// //
// // match self.data_writer.push(data)
// // {
// // Ok(_) if tag.is_some() =>
// // {
// // let _ = self.tag_writer.push(tag.unwrap());
// // Ok(())
// // }
// // Ok(_) => Ok(()),
// // Err(data) => Err((data, tag.map(|t| t.tag)).into()),
// // }
// }
//
// /// Pushes some data on the input (not tagged).
// ///
// /// The operation succeeds (`Ok(())`) if there is enough room
// /// Or fails returning the given data to the caller.
// pub fn push_no_tag(&self, data: T) -> Result<(), T>
// {
// Ok(())
// //self.data_writer.push(data)
// }
// }
/// Returns true iif no elements are available on the input.
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
/// Pops an element from the input.
/// It is guaranteed to return `Some(data)` if
/// if pop was called strictly less times than len
pub fn pop(&self) -> Option<Tagged<T>>
{
let data = self.data_reader.pop_with_index();
if let Some((data, index)) = data
{
let mut tag = None;
if self
.tag_reader
.peek(|t| t.position)
.is_some_and(|x| x == index)
{
tag = self.tag_reader.pop();
}
Some((data, tag.map(|t| t.tag)).into())
}
else
{
None
}
}
/// Pops an element from the input, discarding the tag.
/// It is guaranteed to return `Some(data)` if
/// if pop was called strictly less times than len
pub fn pop_untag(&self) -> Option<T>
{
self.pop().map(|data| data.into_inner())
}
}
impl<T> OutWriter<'_, T>
{
/// Gets how much room is available on the output
pub fn len(&self) -> usize
{
self.data_writer.len().min(self.tag_writer.len())
}
/// Returns true iif no element can be sent
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
/// Pushes some tagged data on the input.
///
/// The operation succeeds (`Ok(())`) if there is enough room
/// Or fails returning the given data to the caller.
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
let (data, tag) = data.into();
let position = self.data_writer.next_index();
let tag = tag.map(|t| TagSlot { position, tag: t });
match self.data_writer.push(data)
{
Ok(_) if tag.is_some() =>
{
let _ = self.tag_writer.push(tag.unwrap());
Ok(())
}
Ok(_) => Ok(()),
Err(data) => Err((data, tag.map(|t| t.tag)).into()),
}
}
/// Pushes some data on the input (not tagged).
///
/// The operation succeeds (`Ok(())`) if there is enough room
/// Or fails returning the given data to the caller.
pub fn push_no_tag(&self, data: T) -> Result<(), T>
{
self.data_writer.push(data)
}
}
// --------------------
// Iterator facilites
// --------------------
/// An iterator type to push data to output(s)
pub struct PopIter<T>
@ -356,55 +370,55 @@ pub trait PopIterable<'a>
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
impl<'a, T: 'static> PopIterable<'a> for In<T>
{
type Output = InReader<'a, T>;
fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
{
let reader = self.read();
PopIter {
popped: 0,
len: reader.len(),
reader,
}
}
}
// impl<'a, T: 'static> PopIterable<'a> for In<T>
// {
// type Output = InReader<'a, T>;
// fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
// {
// let reader = self.read();
// PopIter {
// popped: 0,
// len: reader.len(),
// reader,
// }
// }
// }
//
// generate_pop_iterable_tuple_impl! {1}
// generate_pop_iterable_tuple_impl! {2}
// generate_pop_iterable_tuple_impl! {3}
// generate_pop_iterable_tuple_impl! {4}
// generate_pop_iterable_tuple_impl! {5}
// generate_pop_iterable_tuple_impl! {6}
// generate_pop_iterable_tuple_impl! {7}
// generate_pop_iterable_tuple_impl! {8}
// generate_pop_iterable_tuple_impl! {9}
// generate_pop_iterable_tuple_impl! {10}
// generate_pop_iterable_tuple_impl! {11}
// generate_pop_iterable_tuple_impl! {12}
//
// impl<'a, T> Iterator for PopIter<InReader<'a, T>>
// {
// type Item = Tagged<T>;
//
// fn next(&mut self) -> Option<Self::Item>
// {
// self.reader.pop()
// }
// }
generate_pop_iterable_tuple_impl! {1}
generate_pop_iterable_tuple_impl! {2}
generate_pop_iterable_tuple_impl! {3}
generate_pop_iterable_tuple_impl! {4}
generate_pop_iterable_tuple_impl! {5}
generate_pop_iterable_tuple_impl! {6}
generate_pop_iterable_tuple_impl! {7}
generate_pop_iterable_tuple_impl! {8}
generate_pop_iterable_tuple_impl! {9}
generate_pop_iterable_tuple_impl! {10}
generate_pop_iterable_tuple_impl! {11}
generate_pop_iterable_tuple_impl! {12}
impl<'a, T> Iterator for PopIter<InReader<'a, T>>
{
type Item = Tagged<T>;
fn next(&mut self) -> Option<Self::Item>
{
self.reader.pop()
}
}
impl_iterator_for_pop_iter_tuple! {1}
impl_iterator_for_pop_iter_tuple! {2}
impl_iterator_for_pop_iter_tuple! {3}
impl_iterator_for_pop_iter_tuple! {4}
impl_iterator_for_pop_iter_tuple! {5}
impl_iterator_for_pop_iter_tuple! {6}
impl_iterator_for_pop_iter_tuple! {7}
impl_iterator_for_pop_iter_tuple! {8}
impl_iterator_for_pop_iter_tuple! {9}
impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
// impl_iterator_for_pop_iter_tuple! {1}
// impl_iterator_for_pop_iter_tuple! {2}
// impl_iterator_for_pop_iter_tuple! {3}
// impl_iterator_for_pop_iter_tuple! {4}
// impl_iterator_for_pop_iter_tuple! {5}
// impl_iterator_for_pop_iter_tuple! {6}
// impl_iterator_for_pop_iter_tuple! {7}
// impl_iterator_for_pop_iter_tuple! {8}
// impl_iterator_for_pop_iter_tuple! {9}
// impl_iterator_for_pop_iter_tuple! {10}
// impl_iterator_for_pop_iter_tuple! {11}
// impl_iterator_for_pop_iter_tuple! {12}
/// StreamProducer object for data and tags stored in a type
/// agnostic/erased way.

View File

@ -1,4 +1,3 @@
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ops::Deref;
@ -53,37 +52,6 @@ unsafe impl<T: Send> Sync for StreamProducer<T> {}
unsafe impl<T: Send> Send for StreamConsumer<T> {}
unsafe impl<T: Send> Sync for StreamConsumer<T> {}
// Represents a write operation within a stream producer
pub struct StreamWriter<'a, T>
{
producer: &'a mut StreamProducer<T>,
first: &'a UnsafeCell<[MaybeUninit<T>]>,
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
first_len: usize,
second_len: usize,
written: Cell<usize>,
// Index of the first element to be pushed
// within the "infinite buffer"
// Used to number tags
start_index: usize,
}
// Represents a read operation within a stream producer
pub struct StreamReader<'a, T>
{
producer: &'a StreamConsumer<T>,
first: &'a UnsafeCell<[MaybeUninit<T>]>,
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
first_len: usize,
second_len: usize,
read: Cell<usize>,
// Index of the first element to be read
// within the "infinite buffer"
// Used to number tags
start_index: usize,
}
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
{
@ -123,7 +91,22 @@ pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T
impl<T> StreamProducer<T>
{
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
pub fn produce(&mut self, written: usize)
{
// Advance head.
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
// Check bounds
assert!(head + written - tail <= (self.inner.capacity_mask + 1));
// We want writes to the buffer to be visible when acquired in the pop side
self.inner
.head
.store(head + written, Ordering::Release);
}
pub fn write(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
{
// We need to claim the maximum amount of elements.
let tail = self.inner.tail.load(Ordering::Acquire);
@ -159,34 +142,9 @@ impl<T> StreamProducer<T>
let (start_to_tail, _tail_to_head) =
start_to_head.split_at_mut_unchecked(wrapped_tail);
// Slices are wrapped into unsafe cells to provide interior mutability
// On the stream as it is much more convienient.
//
// SAFETY:
//
// This functions borrows the stream mutably. As such, only one instance
// of StreamWriter can exist for a given stream. The StreamWriter
// is thus the only on able to write or read the stream when it lives
let first_len = head_to_end.len();
let second_len = start_to_tail.len();
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(head_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
StreamWriter {
start_index: head,
producer: self,
first,
second,
first_len,
second_len,
written: 0.into(),
}
// of these slices can exist for a given stream.
(head_to_end, start_to_tail)
}
}
else
@ -195,12 +153,6 @@ impl<T> StreamProducer<T>
if wrapped_tail < wrapped_head
{
//
// Or
// ▯▯▯▯▯▯▯▯▯▯▯▯▯
// |
// tail & head
// (empty)
// Current configuration :
// ▯▯▯▮▮▮▮▮▮▯▯▯▯
// | |
@ -223,28 +175,7 @@ impl<T> StreamProducer<T>
let (start_to_tail, _tail_to_head) =
start_to_head.split_at_mut_unchecked(wrapped_tail);
let first_len = head_to_end.len();
let second_len = start_to_tail.len();
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[std::mem::MaybeUninit<T>]>,
>(head_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
StreamWriter {
start_index: head,
producer: self,
first,
second,
first_len,
second_len,
written: 0.into(),
}
(head_to_end, start_to_tail)
}
}
else
@ -261,8 +192,8 @@ impl<T> StreamProducer<T>
// |
// tail & head
// (full)
// ______.______
// slice2 slice1
// .
// slice1
// SAFETY:
//
@ -272,20 +203,10 @@ impl<T> StreamProducer<T>
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_tail - wrapped_head;
StreamWriter {
start_index: head,
producer: self,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&k[wrapped_head..wrapped_tail]),
second: None,
written: 0.into(),
}
let (_start_to_head, head_to_tail) = k.split_at_mut_unchecked(wrapped_head);
let (head_to_tail, empty_slice) =
head_to_tail.split_at_mut_unchecked(wrapped_tail - wrapped_head);
(head_to_tail, empty_slice)
}
}
}
@ -294,7 +215,22 @@ impl<T> StreamProducer<T>
impl<T> StreamConsumer<T>
{
pub fn read<'a>(&'a mut self) -> StreamReader<'a, T>
pub fn consume(&mut self, read: usize)
{
// Advance head.
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
// Check bounds
assert!(tail + read <= head);
// We want writes to the buffer to be visible when acquired in the pop side
self.inner
.tail
.store(tail + read, Ordering::Release);
}
pub fn read_uninit(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
{
// We need to claim the maximum amount of elements.
let head = self.inner.head.load(Ordering::Acquire);
@ -311,19 +247,9 @@ impl<T> StreamConsumer<T>
// Buffer is empty. Return empty slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
StreamReader {
start_index: tail,
producer: self,
first_len: len,
second_len: 0,
first: std::mem::transmute::<&[MaybeUninit<T>], &UnsafeCell<[MaybeUninit<T>]>>(
&k[wrapped_tail..wrapped_head],
),
second: None,
read: 0.into(),
}
let head_to_tail = &mut k[wrapped_head..wrapped_tail];
let (empty_1, empty_2) = head_to_tail.split_at_mut_unchecked(0);
(empty_1, empty_2)
}
}
else
@ -346,21 +272,10 @@ impl<T> StreamConsumer<T>
//
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
StreamReader {
start_index: tail,
producer: self,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&k[wrapped_tail..wrapped_head]),
second: None,
read: 0.into(),
}
let k = &mut (&mut *self.inner.buffer.get())[wrapped_tail..wrapped_head];
let (tail_to_head, empty_slice) =
k.split_at_mut_unchecked(wrapped_head - wrapped_tail);
(tail_to_head, empty_slice)
}
}
else
@ -395,339 +310,114 @@ impl<T> StreamConsumer<T>
let (start_to_head, _head_to_tail) =
start_to_tail.split_at_mut_unchecked(wrapped_head);
let first_len = tail_to_end.len();
let second_len = start_to_head.len();
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(tail_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_head));
StreamReader {
start_index: tail,
producer: self,
first,
second,
first_len,
second_len,
read: 0.into(),
}
(tail_to_end, start_to_head)
}
}
}
}
/// Creates a reader of contiguous elements that
/// satisfy the predicate
pub fn read_while<F>(&mut self, predicate: F) -> StreamReader<'_, T>
where
F: Fn(&T) -> bool,
{
// Take a normal reader. This contains available elements to read.
let mut reader = self.read();
// We need to trim the slices to keep only the satified elements
// First slice
let mut first_kept = 0;
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*reader.first.get() }
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Forget about second slice
reader.second_len = 0;
reader.second = None;
// Trim first slice
reader.first_len = first_kept;
unsafe {
reader.first = std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&(&*reader.first.get())[0..first_kept]);
}
return reader;
}
first_kept += 1;
}
// If we are here, all of the elements of the first slice, satisfy the predicate
if let Some(second_slice) = &mut reader.second
{
// Second slice
let mut second_kept = 0;
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*second_slice.get() }
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Trim second slice
reader.second_len = second_kept;
unsafe {
reader.second = Some(std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(
&(&*second_slice.get())[0..first_kept]
));
}
return reader;
}
second_kept += 1;
}
}
return reader;
}
// Creates a reader of contiguous elements that
// satisfy the predicate
// pub fn read_while<F>(&mut self, predicate: F) -> StreamReader<'_, T>
// where
// F: Fn(&T) -> bool,
// {
// // Take a normal reader. This contains available elements to read.
// let mut reader = self.read();
//
// // We need to trim the slices to keep only the satified elements
//
// // First slice
// let mut first_kept = 0;
// // SAFETY:
// //
// // Only us can have a reference to these slices of the buffer
// for element in unsafe { &*reader.first.get() }
// {
// // SAFETY
// //
// // If this element is in a reader returned by self.read
// // with no pop called, we know it is initialized
// let init_element = unsafe { element.assume_init_ref() };
// let sat = predicate(init_element);
// if !sat
// {
// // Stop here
// // Forget about second slice
// reader.second_len = 0;
// reader.second = None;
//
// // Trim first slice
// reader.first_len = first_kept;
// unsafe {
// reader.first = std::mem::transmute::<
// &[MaybeUninit<T>],
// &UnsafeCell<[MaybeUninit<T>]>,
// >(&(&*reader.first.get())[0..first_kept]);
// }
//
// return reader;
// }
// first_kept += 1;
// }
//
// // If we are here, all of the elements of the first slice, satisfy the predicate
//
// if let Some(second_slice) = &mut reader.second
// {
// // Second slice
// let mut second_kept = 0;
// // SAFETY:
// //
// // Only us can have a reference to these slices of the buffer
// for element in unsafe { &*second_slice.get() }
// {
// // SAFETY
// //
// // If this element is in a reader returned by self.read
// // with no pop called, we know it is initialized
// let init_element = unsafe { element.assume_init_ref() };
// let sat = predicate(init_element);
// if !sat
// {
// // Stop here
// // Trim second slice
// reader.second_len = second_kept;
// unsafe {
// reader.second = Some(std::mem::transmute::<
// &[MaybeUninit<T>],
// &UnsafeCell<[MaybeUninit<T>]>,
// >(
// &(&*second_slice.get())[0..first_kept]
// ));
// }
// return reader;
// }
// second_kept += 1;
// }
// }
//
// return reader;
// }
}
impl<'a, T> StreamWriter<'a, T>
{
pub fn len(&self) -> usize
impl<T: Copy> StreamConsumer<T>
{
pub fn read(&mut self) -> (&[T], &[T])
{
self.first_len + self.second_len
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn next_index(&self) -> usize
{
self.start_index + self.written.get()
}
pub fn push(&self, element: T) -> Result<(), T>
{
if self.written.get() < self.first_len
let (slice_1, slice_2) = self.read_uninit();
unsafe
{
unsafe {
(&mut *self.first.get())[self.written.get()] = MaybeUninit::new(element);
}
self.written.set(self.written.get() + 1);
Ok(())
(std::mem::transmute(slice_1), std::mem::transmute(slice_2))
}
else if let Some(second) = &self.second
&& self.written.get() - self.first_len < self.second_len
{
unsafe {
(&mut *second.get())[self.written.get() - self.first_len] =
MaybeUninit::new(element);
}
self.written.set(self.written.get() + 1);
Ok(())
}
else
{
Err(element)
}
}
pub fn write(&self, length: usize)
{
let new = self.written.get() + length;
assert!(new < self.len());
self.written.set(new);
}
}
impl<'a, T: Copy> StreamWriter<'a, T>
{
pub fn slices_mut(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
{
unsafe {
(
&mut *self.first.get(),
self.second
.map(|x| &mut *x.get())
.unwrap_or_else(|| &mut(&mut *self.first.get())[0..0]),
)
}
}
}
impl<'a, T> StreamReader<'a, T>
{
pub fn len(&self) -> usize
{
self.first_len + self.second_len
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn last_index(&self) -> usize
{
self.start_index + self.len()
}
pub fn next_index(&self) -> usize
{
self.start_index + self.read.get()
}
pub fn pop_with_index(&self) -> Option<(T, usize)>
{
let index = self.next_index();
self.pop().map(|t| (t, index))
}
pub fn peek<F, O>(&self, peeker: F) -> Option<O>
where
F: Fn(&T) -> O,
{
// Same as pop, without taking, or increasing read count
if self.read.get() < self.first_len
{
// SAFETY:
//
// If element is in this slice, it is initialized.
// We take it once since read increases
let element = unsafe { (&mut *self.first.get())[self.read.get()].assume_init_ref() };
Some(peeker(element))
}
else if let Some(second) = &self.second
&& self.read.get() - self.first_len < self.second_len
{
let element =
unsafe { (&mut *second.get())[self.read.get() - self.first_len].assume_init_ref() };
Some(peeker(element))
}
else
{
None
}
}
pub fn pop(&self) -> Option<T>
{
if self.read.get() < self.first_len
{
// SAFETY:
//
// If element is in this slice, it is initialized.
// We take it once since read increases
let element = unsafe {
std::mem::replace(
&mut (&mut *self.first.get())[self.read.get()],
MaybeUninit::uninit(),
)
.assume_init()
};
self.read.set(self.read.get() + 1);
Some(element)
}
else if let Some(second) = &self.second
&& self.read.get() - self.first_len < self.second_len
{
let element = unsafe {
std::mem::replace(
&mut (&mut *second.get())[self.read.get() - self.first_len],
MaybeUninit::uninit(),
)
.assume_init()
};
self.read.set(self.read.get() + 1);
Some(element)
}
else
{
None
}
}
pub fn read(&self, length: usize)
{
let new = self.read.get() + length;
assert!(new < self.len());
self.read.set(new);
}
}
impl<'a, T: Copy> StreamReader<'a, T>
{
pub fn slices(&self) -> (&[T], &[T])
{
unsafe {
(
std::mem::transmute::<&[MaybeUninit<T>], &[T]>(&*self.first.get()),
std::mem::transmute::<&[MaybeUninit<T>], &[T]>(
self.second
.map(|x| &*x.get())
.unwrap_or_else(|| &(&*self.first.get())[0..0]),
),
)
}
}
}
// When a Stream writer goes out of scope, it wrote
// some things into the stream. These things need to be commited to the queue
impl<'a, T> Drop for StreamWriter<'a, T>
{
fn drop(&mut self)
{
// Advance head.
// We know that this value hasn't changed since this StreamWriter was created
// let head = self.producer.inner.head.load(Ordering::Relaxed);
// We want writes to the buffer to be visible when acquired in the pop side
self.producer
.inner
.head
.store(self.start_index + self.written.get(), Ordering::Release);
}
}
// When a Stream reader goes out of scope, it took
// some things from the stream. These things need to be de-commited to the queue
impl<'a, T> Drop for StreamReader<'a, T>
{
fn drop(&mut self)
{
// Advance tail.
// We know that this value hasn't changed since this StreamWriter was created
// let tail = self.producer.inner.tail.load(Ordering::Relaxed);
// We want writes to the buffer to be visible when acquired in the push side
self.producer
.inner
.tail
.store(self.start_index + self.read.get(), Ordering::Release);
}
}
mod test
{
use std::mem::MaybeUninit;
#[allow(unused_imports)]
use crate::stream::bounded_queue;
@ -738,57 +428,66 @@ mod test
let (mut tx, mut rx) = bounded_queue::<usize>(4);
{
let writer = tx.write();
let (a, b) = tx.write();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
assert_eq!(writer.len(), 4);
a[0] = MaybeUninit::new(0);
a[1] = MaybeUninit::new(1);
a[2] = MaybeUninit::new(2);
a[3] = MaybeUninit::new(3);
assert_eq!(writer.push(1), Ok(()));
assert_eq!(writer.push(2), Ok(()));
assert_eq!(writer.push(3), Ok(()));
assert_eq!(writer.push(4), Ok(()));
assert_eq!(writer.push(5), Err(5));
tx.produce(4);
}
{
let reader = rx.read();
let (a, b) = rx.read();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
assert_eq!(reader.len(), 4);
assert_eq!(a[0], 0);
assert_eq!(a[1], 1);
assert_eq!(a[2], 2);
assert_eq!(a[3], 3);
assert_eq!(reader.pop(), Some(1));
assert_eq!(reader.pop(), Some(2));
assert_eq!(reader.pop(), Some(3));
assert_eq!(reader.pop(), Some(4));
assert_eq!(reader.pop(), None);
rx.consume(4);
}
// Put stream into weird situatino
// Put stream into weird situation
{
let writer = tx.write();
assert_eq!(writer.push(1), Ok(()));
assert_eq!(writer.push(2), Ok(()));
assert_eq!(writer.push(3), Ok(()));
assert_eq!(writer.push(4), Ok(()));
let (a, b) = tx.write();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
a[0] = MaybeUninit::new(0);
a[1] = MaybeUninit::new(1);
a[2] = MaybeUninit::new(2);
tx.produce(3);
}
{
let reader = rx.read();
assert_eq!(reader.pop(), Some(1));
assert_eq!(reader.pop(), Some(2));
let (a, b) = rx.read();
assert_eq!(a.len(), 3);
assert_eq!(b.len(), 0);
assert_eq!(a[0], 0);
assert_eq!(a[1], 1);
assert_eq!(a[2], 2);
rx.consume(1);
}
{
let writer = tx.write();
assert_eq!(writer.len(), 2);
assert_eq!(writer.push(5), Ok(()));
assert_eq!(writer.push(6), Ok(()));
let (a, b) = tx.write();
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
}
{
let reader = rx.read();
assert_eq!(reader.pop(), Some(3));
assert_eq!(reader.pop(), Some(4));
assert_eq!(reader.pop(), Some(5));
assert_eq!(reader.pop(), Some(6));
let (a, b) = rx.read();
assert_eq!(a.len(), 2);
assert_eq!(b.len(), 0);
}
}
}