Better interface for stream readers

This commit is contained in:
2025-10-23 20:52:15 +02:00
parent 6bc7c2ab16
commit e0dc672b58
4 changed files with 120 additions and 34 deletions

45
examples/streams.rs Normal file
View File

@ -0,0 +1,45 @@
use std::time::Duration;
use rdsp::{stream, stream_operators};
fn main() {
let (mut a_tx, a_rx) = stream::stream::<u32>(16);
let (mut b_tx, b_rx) = stream::stream::<u32>(16);
std::thread::spawn(move || {
let mut k = 0;
loop {
std::thread::sleep(Duration::from_millis(800));
let mut buf = a_tx.write();
for i in 0..16 {
buf[i].write(k);
k += 1;
}
buf.swap(16);
}
});
std::thread::spawn(move || {
let mut k = 0;
loop {
std::thread::sleep(Duration::from_millis(300));
let mut buf = b_tx.write();
for i in 0..16 {
buf[i].write(k);
k += 1;
}
buf.swap(16);
}
});
let (adder, out) = stream_operators::StreamAdder::new(a_rx, b_rx);
{
loop {
let x = out.read();
for i in 0..x.len() {
println!("{}", x[i]);
}
}
}
}

View File

@ -9,7 +9,7 @@ fn main() {
std::thread::spawn(move || {
let mut k = 0;
loop {
std::thread::sleep(Duration::from_millis(500));
std::thread::sleep(Duration::from_millis(8));
let mut buf = a_tx.write();
for i in 0..16 {
buf[i].write(k);
@ -22,7 +22,7 @@ fn main() {
std::thread::spawn(move || {
let mut k = 0;
loop {
std::thread::sleep(Duration::from_millis(500));
std::thread::sleep(Duration::from_millis(3));
let mut buf = b_tx.write();
for i in 0..16 {
buf[i].write(k);

View File

@ -5,10 +5,11 @@ use std::{
cell::UnsafeCell,
mem::MaybeUninit,
ops::{Index, IndexMut},
slice::Iter,
sync::{Arc, Condvar, Mutex, RwLock},
};
// Internals
struct StreamInner<T: Sized> {
buffer_size: usize,
@ -29,14 +30,6 @@ struct StreamInner<T: Sized> {
unsafe impl<T> Sync for StreamInner<T> {}
unsafe impl<T> Send for StreamInner<T> {}
pub struct StreamReader<T: Sized> {
inner: Arc<StreamInner<T>>,
}
pub struct StreamWriter<T: Sized> {
inner: Arc<StreamInner<T>>,
}
impl<T: Sized> StreamInner<T> {
pub fn new(buffer_size: usize) -> Self {
Self {
@ -67,10 +60,12 @@ impl<T: Sized> StreamInner<T> {
}
}
impl<T: Sized> StreamWriter<T> {
pub fn write(&mut self) -> StreamWriterGuard<'_, T> {
StreamWriterGuard { inner: self }
}
pub struct StreamReader<T: Sized> {
inner: Arc<StreamInner<T>>,
}
pub struct StreamWriter<T: Sized> {
inner: Arc<StreamInner<T>>,
}
pub fn stream<T: Sized>(buffer_size: usize) -> (StreamWriter<T>, StreamReader<T>) {
@ -86,6 +81,14 @@ pub fn stream<T: Sized>(buffer_size: usize) -> (StreamWriter<T>, StreamReader<T>
)
}
// Stream writer
impl<T: Sized> StreamWriter<T> {
pub fn write(&mut self) -> StreamWriterGuard<'_, T> {
StreamWriterGuard { inner: self }
}
}
pub struct StreamWriterGuard<'a, T> {
inner: &'a StreamWriter<T>,
}
@ -174,6 +177,56 @@ impl<'a, T> IndexMut<usize> for StreamWriterGuard<'a, T> {
}
}
// Stream reader
impl<'a, T: Copy + Sized> StreamReader<T> {
pub fn iter(&'a self) -> StreamReaderIter<'a, T> {
StreamReaderIter {
reader: self,
current_guard: None,
current_index: 0,
}
}
}
pub struct StreamReaderIter<'a, T> {
reader: &'a StreamReader<T>,
current_guard: Option<StreamReaderGuard<'a, T>>,
current_index: usize,
}
impl<'a, T: Copy> Iterator for StreamReaderIter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.current_guard.is_none()
|| self.current_index == self.current_guard.as_ref().unwrap().len()
{
if self.current_guard.is_some() {
let old = self.current_guard.take();
drop(old);
}
self.current_guard = Some(self.reader.read());
self.current_index = 0;
}
let guard = self.current_guard.as_ref().unwrap();
let ret = Some(guard[self.current_index]);
self.current_index += 1;
ret
}
}
pub struct StreamReaderGuard<'a, T> {
inner: &'a StreamReader<T>,
available: usize,
}
pub struct StreamReaderGuardIter<'a, T> {
guard: &'a StreamReaderGuard<'a, T>,
current_index: usize,
}
impl<T: Sized> StreamReader<T> {
pub fn read(&self) -> StreamReaderGuard<'_, T> {
*self
@ -189,16 +242,6 @@ impl<T: Sized> StreamReader<T> {
}
}
pub struct StreamReaderGuard<'a, T> {
inner: &'a StreamReader<T>,
available: usize,
}
pub struct StreamReaderGuardIter<'a, T> {
guard: &'a StreamReaderGuard<'a, T>,
current_index: usize,
}
impl<'a, T> StreamReaderGuard<'a, T> {
pub fn len(&self) -> usize {
self.available

View File

@ -1,4 +1,4 @@
use std::{fmt::write, ops::Add};
use std::ops::Add;
use crate::stream::{StreamReader, stream};
@ -8,19 +8,17 @@ pub struct StreamAdder<T> {
impl<T: Add<T, Output = T> + Copy + 'static> StreamAdder<T> {
pub fn new(input_a: StreamReader<T>, input_b: StreamReader<T>) -> (Self, StreamReader<T>) {
let (writer, reader) = stream::<T>(1024);
let (mut writer, reader) = stream::<T>(1024);
std::thread::spawn(move || {
let mut writer = writer;
//let mut writer = writer;
let mut iter1 = input_a.iter();
let mut iter2 = input_b.iter();
loop {
let reader_a = input_a.read();
let reader_b = input_b.read();
let mut write = writer.write();
let len = reader_a.len().min(reader_b.len()).min(write.len());
let len = write.len();
for i in 0..len {
write[i].write(reader_a[i] + reader_b[i]);
write[i].write(iter1.next().unwrap() + iter2.next().unwrap());
}
write.swap(len);
}
});