diff --git a/examples/streams.rs b/examples/streams.rs new file mode 100644 index 0000000..b3ab6e0 --- /dev/null +++ b/examples/streams.rs @@ -0,0 +1,45 @@ +use std::time::Duration; + +use rdsp::{stream, stream_operators}; + +fn main() { + let (mut a_tx, a_rx) = stream::stream::(16); + let (mut b_tx, b_rx) = stream::stream::(16); + + std::thread::spawn(move || { + let mut k = 0; + loop { + std::thread::sleep(Duration::from_millis(800)); + let mut buf = a_tx.write(); + for i in 0..16 { + buf[i].write(k); + k += 1; + } + buf.swap(16); + } + }); + + std::thread::spawn(move || { + let mut k = 0; + loop { + std::thread::sleep(Duration::from_millis(300)); + let mut buf = b_tx.write(); + for i in 0..16 { + buf[i].write(k); + k += 1; + } + buf.swap(16); + } + }); + + let (adder, out) = stream_operators::StreamAdder::new(a_rx, b_rx); + + { + loop { + let x = out.read(); + for i in 0..x.len() { + println!("{}", x[i]); + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 989a074..f96f5f2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ fn main() { std::thread::spawn(move || { let mut k = 0; loop { - std::thread::sleep(Duration::from_millis(500)); + std::thread::sleep(Duration::from_millis(8)); let mut buf = a_tx.write(); for i in 0..16 { buf[i].write(k); @@ -22,7 +22,7 @@ fn main() { std::thread::spawn(move || { let mut k = 0; loop { - std::thread::sleep(Duration::from_millis(500)); + std::thread::sleep(Duration::from_millis(3)); let mut buf = b_tx.write(); for i in 0..16 { buf[i].write(k); diff --git a/src/stream.rs b/src/stream.rs index 7f8562c..5d52967 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,10 +5,11 @@ use std::{ cell::UnsafeCell, mem::MaybeUninit, ops::{Index, IndexMut}, - slice::Iter, sync::{Arc, Condvar, Mutex, RwLock}, }; +// Internals + struct StreamInner { buffer_size: usize, @@ -29,14 +30,6 @@ struct StreamInner { unsafe impl Sync for StreamInner {} unsafe impl Send for StreamInner {} -pub struct StreamReader { - inner: Arc>, -} - -pub struct StreamWriter { - inner: Arc>, -} - impl StreamInner { pub fn new(buffer_size: usize) -> Self { Self { @@ -67,10 +60,12 @@ impl StreamInner { } } -impl StreamWriter { - pub fn write(&mut self) -> StreamWriterGuard<'_, T> { - StreamWriterGuard { inner: self } - } +pub struct StreamReader { + inner: Arc>, +} + +pub struct StreamWriter { + inner: Arc>, } pub fn stream(buffer_size: usize) -> (StreamWriter, StreamReader) { @@ -86,6 +81,14 @@ pub fn stream(buffer_size: usize) -> (StreamWriter, StreamReader ) } +// Stream writer + +impl StreamWriter { + pub fn write(&mut self) -> StreamWriterGuard<'_, T> { + StreamWriterGuard { inner: self } + } +} + pub struct StreamWriterGuard<'a, T> { inner: &'a StreamWriter, } @@ -174,6 +177,56 @@ impl<'a, T> IndexMut for StreamWriterGuard<'a, T> { } } +// Stream reader +impl<'a, T: Copy + Sized> StreamReader { + pub fn iter(&'a self) -> StreamReaderIter<'a, T> { + StreamReaderIter { + reader: self, + current_guard: None, + current_index: 0, + } + } +} + +pub struct StreamReaderIter<'a, T> { + reader: &'a StreamReader, + current_guard: Option>, + current_index: usize, +} + +impl<'a, T: Copy> Iterator for StreamReaderIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + if self.current_guard.is_none() + || self.current_index == self.current_guard.as_ref().unwrap().len() + { + if self.current_guard.is_some() { + let old = self.current_guard.take(); + drop(old); + } + self.current_guard = Some(self.reader.read()); + self.current_index = 0; + } + + let guard = self.current_guard.as_ref().unwrap(); + + let ret = Some(guard[self.current_index]); + self.current_index += 1; + ret + } +} + +pub struct StreamReaderGuard<'a, T> { + inner: &'a StreamReader, + available: usize, +} + +pub struct StreamReaderGuardIter<'a, T> { + guard: &'a StreamReaderGuard<'a, T>, + current_index: usize, +} + impl StreamReader { pub fn read(&self) -> StreamReaderGuard<'_, T> { *self @@ -189,16 +242,6 @@ impl StreamReader { } } -pub struct StreamReaderGuard<'a, T> { - inner: &'a StreamReader, - available: usize, -} - -pub struct StreamReaderGuardIter<'a, T> { - guard: &'a StreamReaderGuard<'a, T>, - current_index: usize, -} - impl<'a, T> StreamReaderGuard<'a, T> { pub fn len(&self) -> usize { self.available diff --git a/src/stream_operators.rs b/src/stream_operators.rs index 82008a3..e693efc 100644 --- a/src/stream_operators.rs +++ b/src/stream_operators.rs @@ -1,4 +1,4 @@ -use std::{fmt::write, ops::Add}; +use std::ops::Add; use crate::stream::{StreamReader, stream}; @@ -8,19 +8,17 @@ pub struct StreamAdder { impl + Copy + 'static> StreamAdder { pub fn new(input_a: StreamReader, input_b: StreamReader) -> (Self, StreamReader) { - let (writer, reader) = stream::(1024); + let (mut writer, reader) = stream::(1024); std::thread::spawn(move || { - let mut writer = writer; - + //let mut writer = writer; + let mut iter1 = input_a.iter(); + let mut iter2 = input_b.iter(); loop { - let reader_a = input_a.read(); - let reader_b = input_b.read(); let mut write = writer.write(); - let len = reader_a.len().min(reader_b.len()).min(write.len()); + let len = write.len(); for i in 0..len { - write[i].write(reader_a[i] + reader_b[i]); + write[i].write(iter1.next().unwrap() + iter2.next().unwrap()); } - write.swap(len); } });