Adds stream

This commit is contained in:
2025-10-23 09:41:29 +02:00
parent 59ded0585b
commit 3655a7dff0
3 changed files with 183 additions and 50 deletions

View File

@ -1 +1 @@
mod stream;
pub mod stream;

26
src/main.rs Normal file
View File

@ -0,0 +1,26 @@
use std::time::Duration;
use rdsp::stream;
fn main() {
let (mut a, b) = stream::stream(16, 0);
std::thread::spawn(move || {
loop {
std::thread::sleep(Duration::from_millis(500));
let mut buf = a.write();
for i in 0..16 {
buf[i] = i;
}
buf.swap(16);
}
});
{
loop {
let x = b.read();
for i in 0..16 {
println!("{}", x[i]);
}
}
}
}

View File

@ -1,78 +1,185 @@
// Streams moves data from one place to another
// Implementation of double buffered stream interfaces
use std::sync::Arc;
use std::{
cell::UnsafeCell,
ops::{Index, IndexMut},
sync::{Arc, Condvar, Mutex, RwLock},
};
struct StreamInner<T: Sized> {
// Underlying buffer of data
buffer: Box<[T]>,
buffer_size: usize,
// Current half of the buffer that is on the writer's side
current_write: bool,
available: usize,
written: usize,
// Underlying buffer of data
write_buffer: UnsafeCell<Box<[T]>>,
read_buffer: UnsafeCell<Box<[T]>>,
written: RwLock<usize>,
available: RwLock<usize>,
data_rdy: Mutex<bool>,
swap_rdy: Mutex<bool>,
swap_rdy_cvar: Condvar,
swaped_cvar: Condvar,
}
struct StreamReader<T: Sized> {
unsafe impl<T> Sync for StreamInner<T> {}
unsafe impl<T> Send for StreamInner<T> {}
pub struct StreamReader<T: Sized> {
inner: Arc<StreamInner<T>>,
}
struct StreamWriter<T: Sized> {
pub struct StreamWriter<T: Sized> {
inner: Arc<StreamInner<T>>,
}
impl<T: Sized + Clone> StreamInner<T> {
pub fn new(buffer_size: usize, default: T) -> Self {
Self {
buffer: vec![default; buffer_size * 2].into_boxed_slice(),
buffer_size,
current_write: false,
available: 0,
written: 0,
write_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()),
read_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()),
swaped_cvar: Condvar::new(),
swap_rdy_cvar: Condvar::new(),
data_rdy: Mutex::new(false),
swap_rdy: Mutex::new(true),
available: RwLock::new(0),
written: RwLock::new(0),
}
}
pub fn get_read_slice(&self) -> &[T] {
let offset = if self.current_write {
0
} else {
self.buffer_size
};
&self.buffer[offset..(offset + self.buffer_size)]
}
pub fn get_write_slice(&mut self) -> &mut [T] {
let offset = if self.current_write {
self.buffer_size
} else {
0
};
&mut self.buffer[offset..offset + self.buffer_size]
}
}
enum StreamWriteError {
BufferFull,
}
impl<T: Sized> StreamWriter<T> {
pub fn write(&mut self, data: T) -> Result<(), StreamWriteError> {
if self.inner.written == self.inner.buffer_size {
Err(StreamWriteError::BufferFull)
} else {
unsafe {
*self.inner.get_write_slice()[self.inner.written].assume_init_mut() = data;
self.inner.a
}
self.inner.written += 1;
Ok(())
pub fn write(&mut self) -> StreamWriterGuard<'_, T> {
StreamWriterGuard { inner: self }
}
}
pub fn stream<T: Sized + Clone>(
buffer_size: usize,
default: T,
) -> (StreamWriter<T>, StreamReader<T>) {
let inner = Arc::new(StreamInner::new(buffer_size, default));
(
StreamWriter {
inner: inner.clone(),
},
StreamReader {
inner: inner.clone(),
},
)
}
pub struct StreamWriterGuard<'a, T> {
inner: &'a StreamWriter<T>,
}
impl<'a, T> StreamWriterGuard<'a, T> {
pub fn len(&self) -> usize {
self.inner.inner.buffer_size
}
pub fn is_empty(&self) -> bool {
self.inner.inner.buffer_size == 0
}
pub fn swap(self, size: usize) {
*self.inner.inner.written.write().unwrap() = size;
}
}
impl<'a, T> Drop for StreamWriterGuard<'a, T> {
fn drop(&mut self) {
// Wait for swap to be ready
let mut swap_rdy = self.inner.inner.swap_rdy.lock().unwrap();
while !*swap_rdy {
swap_rdy = self.inner.inner.swap_rdy_cvar.wait(swap_rdy).unwrap();
}
// At this point swap is ready
let mut written = self.inner.inner.written.write().unwrap();
*self.inner.inner.available.write().unwrap() = *written;
*written = 0;
// Actually swap buffers
unsafe {
std::mem::swap(
&mut *self.inner.inner.write_buffer.get(),
&mut *self.inner.inner.read_buffer.get(),
);
}
*swap_rdy = false;
*self.inner.inner.data_rdy.lock().unwrap() = true;
self.inner.inner.swaped_cvar.notify_one();
}
}
impl<'a, T> Index<usize> for StreamWriterGuard<'a, T> {
type Output = T;
fn index(&self, index: usize) -> &Self::Output {
assert!(index < self.inner.inner.buffer_size, "Index out of bounds");
unsafe { &(&*self.inner.inner.write_buffer.get())[index] }
}
}
impl<'a, T> IndexMut<usize> for StreamWriterGuard<'a, T> {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
assert!(index < self.inner.inner.buffer_size, "Index out of bounds");
unsafe { &mut (&mut *self.inner.inner.write_buffer.get())[index] }
}
}
impl<T: Sized> StreamReader<T> {
pub fn read(&self) -> StreamReaderGuard<'_, T> {
*self
.inner
.swaped_cvar
.wait_while(self.inner.data_rdy.lock().unwrap(), |ready| !*ready)
.unwrap() = false;
StreamReaderGuard {
inner: self,
available: *self.inner.available.read().unwrap(),
}
}
}
pub fn stream<T: Sized>(buffer_size: usize) -> (StreamWriter<T>, StreamReader<T>) {
todo!()
pub struct StreamReaderGuard<'a, T> {
inner: &'a StreamReader<T>,
available: usize,
}
impl<'a, T> StreamReaderGuard<'a, T> {
pub fn len(&self) -> usize {
self.available
}
pub fn is_empty(&self) -> bool {
self.available == 0
}
}
impl<'a, T> Drop for StreamReaderGuard<'a, T> {
fn drop(&mut self) {
*self.inner.inner.swap_rdy.lock().unwrap() = true;
self.inner.inner.swap_rdy_cvar.notify_one();
}
}
impl<'a, T> Index<usize> for StreamReaderGuard<'a, T> {
type Output = T;
fn index(&self, index: usize) -> &Self::Output {
assert!(index < self.available, "Index out of bounds");
unsafe { &(&*self.inner.inner.read_buffer.get())[index] }
}
}