diff --git a/src/lib.rs b/src/lib.rs index 683e0b9..baf29e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1 @@ -mod stream; +pub mod stream; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..4063b53 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,26 @@ +use std::time::Duration; + +use rdsp::stream; + +fn main() { + let (mut a, b) = stream::stream(16, 0); + std::thread::spawn(move || { + loop { + std::thread::sleep(Duration::from_millis(500)); + let mut buf = a.write(); + for i in 0..16 { + buf[i] = i; + } + buf.swap(16); + } + }); + + { + loop { + let x = b.read(); + for i in 0..16 { + println!("{}", x[i]); + } + } + } +} diff --git a/src/stream.rs b/src/stream.rs index 27cf766..cbb1fce 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,78 +1,185 @@ // Streams moves data from one place to another // Implementation of double buffered stream interfaces -use std::sync::Arc; +use std::{ + cell::UnsafeCell, + ops::{Index, IndexMut}, + sync::{Arc, Condvar, Mutex, RwLock}, +}; struct StreamInner { - // Underlying buffer of data - buffer: Box<[T]>, buffer_size: usize, - // Current half of the buffer that is on the writer's side - current_write: bool, - available: usize, - written: usize, + // Underlying buffer of data + write_buffer: UnsafeCell>, + read_buffer: UnsafeCell>, + + written: RwLock, + available: RwLock, + + data_rdy: Mutex, + swap_rdy: Mutex, + + swap_rdy_cvar: Condvar, + swaped_cvar: Condvar, } -struct StreamReader { +unsafe impl Sync for StreamInner {} +unsafe impl Send for StreamInner {} + +pub struct StreamReader { inner: Arc>, } -struct StreamWriter { +pub struct StreamWriter { inner: Arc>, } impl StreamInner { pub fn new(buffer_size: usize, default: T) -> Self { Self { - buffer: vec![default; buffer_size * 2].into_boxed_slice(), buffer_size, - current_write: false, - available: 0, - written: 0, + + write_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()), + read_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()), + + swaped_cvar: Condvar::new(), + swap_rdy_cvar: Condvar::new(), + + data_rdy: Mutex::new(false), + swap_rdy: Mutex::new(true), + + available: RwLock::new(0), + written: RwLock::new(0), } } - - pub fn get_read_slice(&self) -> &[T] { - let offset = if self.current_write { - 0 - } else { - self.buffer_size - }; - - &self.buffer[offset..(offset + self.buffer_size)] - } - - pub fn get_write_slice(&mut self) -> &mut [T] { - let offset = if self.current_write { - self.buffer_size - } else { - 0 - }; - - &mut self.buffer[offset..offset + self.buffer_size] - } -} - -enum StreamWriteError { - BufferFull, } impl StreamWriter { - pub fn write(&mut self, data: T) -> Result<(), StreamWriteError> { - if self.inner.written == self.inner.buffer_size { - Err(StreamWriteError::BufferFull) - } else { - unsafe { - *self.inner.get_write_slice()[self.inner.written].assume_init_mut() = data; - self.inner.a - } - self.inner.written += 1; - Ok(()) + pub fn write(&mut self) -> StreamWriterGuard<'_, T> { + StreamWriterGuard { inner: self } + } +} + +pub fn stream( + buffer_size: usize, + default: T, +) -> (StreamWriter, StreamReader) { + let inner = Arc::new(StreamInner::new(buffer_size, default)); + + ( + StreamWriter { + inner: inner.clone(), + }, + StreamReader { + inner: inner.clone(), + }, + ) +} + +pub struct StreamWriterGuard<'a, T> { + inner: &'a StreamWriter, +} + +impl<'a, T> StreamWriterGuard<'a, T> { + pub fn len(&self) -> usize { + self.inner.inner.buffer_size + } + + pub fn is_empty(&self) -> bool { + self.inner.inner.buffer_size == 0 + } + + pub fn swap(self, size: usize) { + *self.inner.inner.written.write().unwrap() = size; + } +} + +impl<'a, T> Drop for StreamWriterGuard<'a, T> { + fn drop(&mut self) { + // Wait for swap to be ready + let mut swap_rdy = self.inner.inner.swap_rdy.lock().unwrap(); + while !*swap_rdy { + swap_rdy = self.inner.inner.swap_rdy_cvar.wait(swap_rdy).unwrap(); + } + + // At this point swap is ready + let mut written = self.inner.inner.written.write().unwrap(); + *self.inner.inner.available.write().unwrap() = *written; + *written = 0; + + // Actually swap buffers + unsafe { + std::mem::swap( + &mut *self.inner.inner.write_buffer.get(), + &mut *self.inner.inner.read_buffer.get(), + ); + } + + *swap_rdy = false; + *self.inner.inner.data_rdy.lock().unwrap() = true; + self.inner.inner.swaped_cvar.notify_one(); + } +} + +impl<'a, T> Index for StreamWriterGuard<'a, T> { + type Output = T; + + fn index(&self, index: usize) -> &Self::Output { + assert!(index < self.inner.inner.buffer_size, "Index out of bounds"); + unsafe { &(&*self.inner.inner.write_buffer.get())[index] } + } +} + +impl<'a, T> IndexMut for StreamWriterGuard<'a, T> { + fn index_mut(&mut self, index: usize) -> &mut Self::Output { + assert!(index < self.inner.inner.buffer_size, "Index out of bounds"); + unsafe { &mut (&mut *self.inner.inner.write_buffer.get())[index] } + } +} + +impl StreamReader { + pub fn read(&self) -> StreamReaderGuard<'_, T> { + *self + .inner + .swaped_cvar + .wait_while(self.inner.data_rdy.lock().unwrap(), |ready| !*ready) + .unwrap() = false; + + StreamReaderGuard { + inner: self, + available: *self.inner.available.read().unwrap(), } } } -pub fn stream(buffer_size: usize) -> (StreamWriter, StreamReader) { - todo!() +pub struct StreamReaderGuard<'a, T> { + inner: &'a StreamReader, + available: usize, +} + +impl<'a, T> StreamReaderGuard<'a, T> { + pub fn len(&self) -> usize { + self.available + } + + pub fn is_empty(&self) -> bool { + self.available == 0 + } +} + +impl<'a, T> Drop for StreamReaderGuard<'a, T> { + fn drop(&mut self) { + *self.inner.inner.swap_rdy.lock().unwrap() = true; + self.inner.inner.swap_rdy_cvar.notify_one(); + } +} + +impl<'a, T> Index for StreamReaderGuard<'a, T> { + type Output = T; + + fn index(&self, index: usize) -> &Self::Output { + assert!(index < self.available, "Index out of bounds"); + unsafe { &(&*self.inner.inner.read_buffer.get())[index] } + } }