This commit is contained in:
2026-03-18 23:33:49 +01:00
parent 4aef173c7c
commit f727c119b8
3 changed files with 176 additions and 135 deletions

View File

@ -80,123 +80,6 @@ impl AnonymousStreamConsumer
}
}
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
tag_stream: Option<StreamConsumer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
tag_stream: Option<StreamProducer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub fn stream<T>() -> (Out<T>, In<T>)
{
let edge = Arc::new(Mutex::new(Edge::default()));
(
Out {
stream: None,
tag_stream: None,
edge: edge.clone(),
},
In {
stream: None,
tag_stream: None,
edge,
},
)
}
impl<T: 'static> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
pub fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
self.stream = Some(consumer.downcast::<T>())
}
pub fn read<'a>(&'a mut self) -> StreamReader<'a, T>
{
self.stream.as_mut().unwrap().read()
}
}
impl<T: 'static> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
pub fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
self.stream = Some(producer.downcast::<T>())
}
// Delegate stream creation to Out object
// which knows the stream type
pub fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
(tx.into(), rx.into())
}
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
{
self.stream.as_mut().unwrap().write()
}
pub fn push_iter<I: Iterator<Item = T>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
for _ in 0..len
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
}
else
{
return false;
}
}
true
}
// Meta information
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}
pub struct PopIter<T>
{
len: usize,

View File

@ -1,10 +1,16 @@
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::sync::Mutex;
use crate::{
edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex, Edge},
stream::{self, StreamConsumer, StreamProducer, StreamReader, StreamWriter},
tag::Tag,
};
use crate::edge::AnonymousStreamConsumer;
use crate::edge::AnonymousStreamProducer;
use crate::edge::BlockIOIndex;
use crate::edge::Edge;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::stream::{self};
use crate::tag::Tag;
pub struct In<T>
{
@ -72,9 +78,11 @@ impl<T: 'static> In<T>
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
{
let data_reader = self.stream.as_mut().unwrap().read();
let tag_reader = self.tag_stream.as_mut().unwrap().read();
InReader {
data_reader: self.stream.as_mut().unwrap().read(),
tag_reader: self.tag_stream.as_mut().unwrap().read(),
data_reader,
tag_reader,
}
}
}
@ -140,3 +148,37 @@ impl<T: 'static> Out<T>
std::any::type_name::<T>()
}
}
impl<T> InReader<'_, T>
{
pub fn len(&self) -> usize
{
self.data_reader.len()
}
pub fn pop_tagged(&self) -> Option<(T, Option<Tag>)>
{
let data = self.data_reader.pop_with_index();
if let Some((data, index)) = data
{
let mut tag = None;
if self
.tag_reader
.peek(|t| t.position)
.is_some_and(|x| x == index)
{
tag = self.tag_reader.pop();
}
Some((data, tag))
}
else
{
None
}
}
pub fn pop_drop_tag(&self) -> Option<T>
{
self.pop_tagged().map(|(data, _)| data)
}
}

View File

@ -1,5 +1,6 @@
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::io::empty;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::sync::Arc;
@ -62,6 +63,11 @@ pub struct StreamWriter<'a, T>
first_len: usize,
second_len: usize,
written: Cell<usize>,
// Index of the first element to be pushed
// within the "infinite buffer"
// Used to number tags
start_index: usize,
}
// Represents a read operation within a stream producer
@ -73,6 +79,11 @@ pub struct StreamReader<'a, T>
first_len: usize,
second_len: usize,
read: Cell<usize>,
// Index of the first element to be read
// within the "infinite buffer"
// Used to number tags
start_index: usize,
}
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
@ -168,6 +179,8 @@ impl<T> StreamProducer<T>
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
StreamWriter {
start_index: head,
producer: self,
first,
second,
@ -224,6 +237,8 @@ impl<T> StreamProducer<T>
>(start_to_tail));
StreamWriter {
start_index: head,
producer: self,
first,
second,
@ -258,9 +273,12 @@ impl<T> StreamProducer<T>
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_tail - wrapped_head;
StreamWriter {
start_index: head,
producer: self,
first_len: wrapped_tail - wrapped_head,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
@ -294,9 +312,12 @@ impl<T> StreamConsumer<T>
// Buffer is empty. Return empty slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
StreamReader {
start_index: tail,
producer: self,
first_len: wrapped_head - wrapped_tail,
first_len: len,
second_len: 0,
first: std::mem::transmute::<&[MaybeUninit<T>], &UnsafeCell<[MaybeUninit<T>]>>(
&k[wrapped_tail..wrapped_head],
@ -327,9 +348,12 @@ impl<T> StreamConsumer<T>
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
StreamReader {
start_index: tail,
producer: self,
first_len: wrapped_head - wrapped_tail,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
@ -385,6 +409,8 @@ impl<T> StreamConsumer<T>
>(start_to_head));
StreamReader {
start_index: tail,
producer: self,
first,
second,
@ -410,7 +436,10 @@ impl<T> StreamConsumer<T>
// First slice
let mut first_kept = 0;
for element in reader.first.get_mut()
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*reader.first.get() }
{
// SAFETY
//
@ -425,18 +454,57 @@ impl<T> StreamConsumer<T>
reader.second_len = 0;
reader.second = None;
// Trim first slice
reader.first_len = first_kept;
unsafe {
*reader.first =
std::mem::transmute::<
&mut [MaybeUninit<T>],
&mut UnsafeCell<[MaybeUninit<T>]>,
>(&mut (reader.first.get_mut())[0..first_kept]);
reader.first = std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&(&*reader.first.get())[0..first_kept]);
}
return reader;
}
first_kept += 1;
}
// If we are here, all of the elements of the first slice, satisfy the predicate
if let Some(second_slice) = &mut reader.second
{
// Second slice
let mut second_kept = 0;
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*second_slice.get() }
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Trim second slice
reader.second_len = second_kept;
unsafe {
reader.second = Some(std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(
&(&*second_slice.get())[0..first_kept]
));
}
return reader;
}
second_kept += 1;
}
}
todo!()
return reader;
}
}
@ -452,6 +520,11 @@ impl<'a, T> StreamWriter<'a, T>
self.len() == 0
}
pub fn next_index(&self) -> usize
{
self.start_index + self.written.get()
}
pub fn push(&self, element: T) -> Result<(), T>
{
if self.written.get() < self.first_len
@ -491,6 +564,49 @@ impl<'a, T> StreamReader<'a, T>
self.len() == 0
}
pub fn last_index(&self) -> usize
{
self.start_index + self.len()
}
pub fn next_index(&self) -> usize
{
self.start_index + self.read.get()
}
pub fn pop_with_index(&self) -> Option<(T, usize)>
{
let index = self.next_index();
self.pop().map(|t| (t, index))
}
pub fn peek<F, O>(&self, peeker: F) -> Option<O>
where
F: Fn(&T) -> O,
{
// Same as pop, without taking, or increasing read count
if self.read.get() < self.first_len
{
// SAFETY:
//
// If element is in this slice, it is initialized.
// We take it once since read increases
let element = unsafe { (&mut *self.first.get())[self.read.get()].assume_init_ref() };
Some(peeker(element))
}
else if let Some(second) = &self.second
&& self.read.get() - self.first_len < self.second_len
{
let element =
unsafe { (&mut *second.get())[self.read.get() - self.first_len].assume_init_ref() };
Some(peeker(element))
}
else
{
None
}
}
pub fn pop(&self) -> Option<T>
{
if self.read.get() < self.first_len