From 915d643dfa8420d110338dd60bb715469533f661 Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Thu, 23 Oct 2025 13:55:06 +0200 Subject: [PATCH] Working streams --- src/block.rs | 3 +++ src/lib.rs | 2 ++ src/main.rs | 6 ++++-- src/stream.rs | 41 ++++++++++++++++++++++++++++------------- src/stream_operators.rs | 22 ++++++++++++++++++++++ 5 files changed, 59 insertions(+), 15 deletions(-) create mode 100644 src/block.rs create mode 100644 src/stream_operators.rs diff --git a/src/block.rs b/src/block.rs new file mode 100644 index 0000000..2557d1a --- /dev/null +++ b/src/block.rs @@ -0,0 +1,3 @@ +use crate::stream::{StreamReader, stream}; + +pub trait Block {} diff --git a/src/lib.rs b/src/lib.rs index baf29e0..355e174 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,3 @@ +pub mod block; pub mod stream; +pub mod stream_operators; diff --git a/src/main.rs b/src/main.rs index 4063b53..e50b36b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,13 +3,15 @@ use std::time::Duration; use rdsp::stream; fn main() { - let (mut a, b) = stream::stream(16, 0); + let (mut a, b) = stream::stream::(16); std::thread::spawn(move || { + let mut k = 0; loop { std::thread::sleep(Duration::from_millis(500)); let mut buf = a.write(); for i in 0..16 { - buf[i] = i; + buf[i].write(k); + k += 1; } buf.swap(16); } diff --git a/src/stream.rs b/src/stream.rs index cbb1fce..f155deb 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -3,7 +3,9 @@ use std::{ cell::UnsafeCell, + mem::MaybeUninit, ops::{Index, IndexMut}, + slice::Iter, sync::{Arc, Condvar, Mutex, RwLock}, }; @@ -11,8 +13,8 @@ struct StreamInner { buffer_size: usize, // Underlying buffer of data - write_buffer: UnsafeCell>, - read_buffer: UnsafeCell>, + write_buffer: UnsafeCell]>>, + read_buffer: UnsafeCell]>>, written: RwLock, available: RwLock, @@ -35,13 +37,23 @@ pub struct StreamWriter { inner: Arc>, } -impl StreamInner { - pub fn new(buffer_size: usize, default: T) -> Self { +impl StreamInner { + pub fn new(buffer_size: usize) -> Self { Self { buffer_size, - write_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()), - read_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()), + write_buffer: UnsafeCell::new( + (0..buffer_size) + .map(|_| MaybeUninit::zeroed()) + .collect::>() + .into_boxed_slice(), + ), + read_buffer: UnsafeCell::new( + (0..buffer_size) + .map(|_| MaybeUninit::zeroed()) + .collect::>() + .into_boxed_slice(), + ), swaped_cvar: Condvar::new(), swap_rdy_cvar: Condvar::new(), @@ -61,11 +73,8 @@ impl StreamWriter { } } -pub fn stream( - buffer_size: usize, - default: T, -) -> (StreamWriter, StreamReader) { - let inner = Arc::new(StreamInner::new(buffer_size, default)); +pub fn stream(buffer_size: usize) -> (StreamWriter, StreamReader) { + let inner = Arc::new(StreamInner::new(buffer_size)); ( StreamWriter { @@ -123,7 +132,7 @@ impl<'a, T> Drop for StreamWriterGuard<'a, T> { } impl<'a, T> Index for StreamWriterGuard<'a, T> { - type Output = T; + type Output = MaybeUninit; fn index(&self, index: usize) -> &Self::Output { assert!(index < self.inner.inner.buffer_size, "Index out of bounds"); @@ -166,6 +175,12 @@ impl<'a, T> StreamReaderGuard<'a, T> { pub fn is_empty(&self) -> bool { self.available == 0 } + + pub fn iter(&self) -> Iter<'_, T> { + unsafe { + let x = (*self.inner.inner.read_buffer.get()); + } + } } impl<'a, T> Drop for StreamReaderGuard<'a, T> { @@ -180,6 +195,6 @@ impl<'a, T> Index for StreamReaderGuard<'a, T> { fn index(&self, index: usize) -> &Self::Output { assert!(index < self.available, "Index out of bounds"); - unsafe { &(&*self.inner.inner.read_buffer.get())[index] } + unsafe { &(&*self.inner.inner.read_buffer.get())[index].assume_init_ref() } } } diff --git a/src/stream_operators.rs b/src/stream_operators.rs new file mode 100644 index 0000000..3765938 --- /dev/null +++ b/src/stream_operators.rs @@ -0,0 +1,22 @@ +use std::ops::Add; + +use crate::stream::{StreamReader, stream}; + +struct StreamAdder { + _phantom: std::marker::PhantomData, +} + +impl + 'static> StreamAdder { + pub fn new(inputs: Vec>) -> (Self, StreamReader) { + let (mut writer, reader) = stream::(1024); + std::thread::spawn(move || { + let readers = inputs.iter().map(|r| r.read()).collect::>(); + }); + ( + Self { + _phantom: Default::default(), + }, + reader, + ) + } +}