Working streams
This commit is contained in:
3
src/block.rs
Normal file
3
src/block.rs
Normal file
@ -0,0 +1,3 @@
|
||||
use crate::stream::{StreamReader, stream};
|
||||
|
||||
pub trait Block<I> {}
|
||||
@ -1 +1,3 @@
|
||||
pub mod block;
|
||||
pub mod stream;
|
||||
pub mod stream_operators;
|
||||
|
||||
@ -3,13 +3,15 @@ use std::time::Duration;
|
||||
use rdsp::stream;
|
||||
|
||||
fn main() {
|
||||
let (mut a, b) = stream::stream(16, 0);
|
||||
let (mut a, b) = stream::stream::<u32>(16);
|
||||
std::thread::spawn(move || {
|
||||
let mut k = 0;
|
||||
loop {
|
||||
std::thread::sleep(Duration::from_millis(500));
|
||||
let mut buf = a.write();
|
||||
for i in 0..16 {
|
||||
buf[i] = i;
|
||||
buf[i].write(k);
|
||||
k += 1;
|
||||
}
|
||||
buf.swap(16);
|
||||
}
|
||||
|
||||
@ -3,7 +3,9 @@
|
||||
|
||||
use std::{
|
||||
cell::UnsafeCell,
|
||||
mem::MaybeUninit,
|
||||
ops::{Index, IndexMut},
|
||||
slice::Iter,
|
||||
sync::{Arc, Condvar, Mutex, RwLock},
|
||||
};
|
||||
|
||||
@ -11,8 +13,8 @@ struct StreamInner<T: Sized> {
|
||||
buffer_size: usize,
|
||||
|
||||
// Underlying buffer of data
|
||||
write_buffer: UnsafeCell<Box<[T]>>,
|
||||
read_buffer: UnsafeCell<Box<[T]>>,
|
||||
write_buffer: UnsafeCell<Box<[MaybeUninit<T>]>>,
|
||||
read_buffer: UnsafeCell<Box<[MaybeUninit<T>]>>,
|
||||
|
||||
written: RwLock<usize>,
|
||||
available: RwLock<usize>,
|
||||
@ -35,13 +37,23 @@ pub struct StreamWriter<T: Sized> {
|
||||
inner: Arc<StreamInner<T>>,
|
||||
}
|
||||
|
||||
impl<T: Sized + Clone> StreamInner<T> {
|
||||
pub fn new(buffer_size: usize, default: T) -> Self {
|
||||
impl<T: Sized> StreamInner<T> {
|
||||
pub fn new(buffer_size: usize) -> Self {
|
||||
Self {
|
||||
buffer_size,
|
||||
|
||||
write_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()),
|
||||
read_buffer: UnsafeCell::new(vec![default.clone(); buffer_size].into_boxed_slice()),
|
||||
write_buffer: UnsafeCell::new(
|
||||
(0..buffer_size)
|
||||
.map(|_| MaybeUninit::zeroed())
|
||||
.collect::<Vec<_>>()
|
||||
.into_boxed_slice(),
|
||||
),
|
||||
read_buffer: UnsafeCell::new(
|
||||
(0..buffer_size)
|
||||
.map(|_| MaybeUninit::zeroed())
|
||||
.collect::<Vec<_>>()
|
||||
.into_boxed_slice(),
|
||||
),
|
||||
|
||||
swaped_cvar: Condvar::new(),
|
||||
swap_rdy_cvar: Condvar::new(),
|
||||
@ -61,11 +73,8 @@ impl<T: Sized> StreamWriter<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream<T: Sized + Clone>(
|
||||
buffer_size: usize,
|
||||
default: T,
|
||||
) -> (StreamWriter<T>, StreamReader<T>) {
|
||||
let inner = Arc::new(StreamInner::new(buffer_size, default));
|
||||
pub fn stream<T: Sized>(buffer_size: usize) -> (StreamWriter<T>, StreamReader<T>) {
|
||||
let inner = Arc::new(StreamInner::new(buffer_size));
|
||||
|
||||
(
|
||||
StreamWriter {
|
||||
@ -123,7 +132,7 @@ impl<'a, T> Drop for StreamWriterGuard<'a, T> {
|
||||
}
|
||||
|
||||
impl<'a, T> Index<usize> for StreamWriterGuard<'a, T> {
|
||||
type Output = T;
|
||||
type Output = MaybeUninit<T>;
|
||||
|
||||
fn index(&self, index: usize) -> &Self::Output {
|
||||
assert!(index < self.inner.inner.buffer_size, "Index out of bounds");
|
||||
@ -166,6 +175,12 @@ impl<'a, T> StreamReaderGuard<'a, T> {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.available == 0
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> Iter<'_, T> {
|
||||
unsafe {
|
||||
let x = (*self.inner.inner.read_buffer.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Drop for StreamReaderGuard<'a, T> {
|
||||
@ -180,6 +195,6 @@ impl<'a, T> Index<usize> for StreamReaderGuard<'a, T> {
|
||||
|
||||
fn index(&self, index: usize) -> &Self::Output {
|
||||
assert!(index < self.available, "Index out of bounds");
|
||||
unsafe { &(&*self.inner.inner.read_buffer.get())[index] }
|
||||
unsafe { &(&*self.inner.inner.read_buffer.get())[index].assume_init_ref() }
|
||||
}
|
||||
}
|
||||
|
||||
22
src/stream_operators.rs
Normal file
22
src/stream_operators.rs
Normal file
@ -0,0 +1,22 @@
|
||||
use std::ops::Add;
|
||||
|
||||
use crate::stream::{StreamReader, stream};
|
||||
|
||||
struct StreamAdder<T> {
|
||||
_phantom: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T: Add<Output = T> + 'static> StreamAdder<T> {
|
||||
pub fn new(inputs: Vec<StreamReader<T>>) -> (Self, StreamReader<T>) {
|
||||
let (mut writer, reader) = stream::<T>(1024);
|
||||
std::thread::spawn(move || {
|
||||
let readers = inputs.iter().map(|r| r.read()).collect::<Vec<_>>();
|
||||
});
|
||||
(
|
||||
Self {
|
||||
_phantom: Default::default(),
|
||||
},
|
||||
reader,
|
||||
)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user