diff --git a/src/main.rs b/src/main.rs index e50b36b..989a074 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,16 @@ use std::time::Duration; -use rdsp::stream; +use rdsp::{stream, stream_operators}; fn main() { - let (mut a, b) = stream::stream::(16); + let (mut a_tx, a_rx) = stream::stream::(16); + let (mut b_tx, b_rx) = stream::stream::(16); + std::thread::spawn(move || { let mut k = 0; loop { std::thread::sleep(Duration::from_millis(500)); - let mut buf = a.write(); + let mut buf = a_tx.write(); for i in 0..16 { buf[i].write(k); k += 1; @@ -17,10 +19,25 @@ fn main() { } }); + std::thread::spawn(move || { + let mut k = 0; + loop { + std::thread::sleep(Duration::from_millis(500)); + let mut buf = b_tx.write(); + for i in 0..16 { + buf[i].write(k); + k += 1; + } + buf.swap(16); + } + }); + + let (adder, out) = stream_operators::StreamAdder::new(a_rx, b_rx); + { loop { - let x = b.read(); - for i in 0..16 { + let x = out.read(); + for i in 0..x.len() { println!("{}", x[i]); } } diff --git a/src/stream.rs b/src/stream.rs index f155deb..7f8562c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -90,6 +90,11 @@ pub struct StreamWriterGuard<'a, T> { inner: &'a StreamWriter, } +pub struct StreamWriterGuardIter<'a, T> { + guard: &'a mut StreamWriterGuard<'a, T>, + current_index: usize, +} + impl<'a, T> StreamWriterGuard<'a, T> { pub fn len(&self) -> usize { self.inner.inner.buffer_size @@ -102,6 +107,28 @@ impl<'a, T> StreamWriterGuard<'a, T> { pub fn swap(self, size: usize) { *self.inner.inner.written.write().unwrap() = size; } + + pub fn iter(&'a mut self) -> StreamWriterGuardIter<'a, T> { + StreamWriterGuardIter { + guard: self, + current_index: 0, + } + } +} + +impl<'a, T> Iterator for StreamWriterGuardIter<'a, T> { + type Item = &'a mut MaybeUninit; + + fn next(&mut self) -> Option { + if self.current_index >= self.guard.inner.inner.buffer_size { + return None; + } + + let x = + unsafe { &mut (&mut *self.guard.inner.inner.write_buffer.get())[self.current_index] }; + self.current_index += 1; + Some(x) + } } impl<'a, T> Drop for StreamWriterGuard<'a, T> { @@ -119,7 +146,7 @@ impl<'a, T> Drop for StreamWriterGuard<'a, T> { // Actually swap buffers unsafe { - std::mem::swap( + std::ptr::swap( &mut *self.inner.inner.write_buffer.get(), &mut *self.inner.inner.read_buffer.get(), ); @@ -167,6 +194,11 @@ pub struct StreamReaderGuard<'a, T> { available: usize, } +pub struct StreamReaderGuardIter<'a, T> { + guard: &'a StreamReaderGuard<'a, T>, + current_index: usize, +} + impl<'a, T> StreamReaderGuard<'a, T> { pub fn len(&self) -> usize { self.available @@ -176,13 +208,28 @@ impl<'a, T> StreamReaderGuard<'a, T> { self.available == 0 } - pub fn iter(&self) -> Iter<'_, T> { - unsafe { - let x = (*self.inner.inner.read_buffer.get()); + pub fn iter(&self) -> StreamReaderGuardIter<'_, T> { + StreamReaderGuardIter { + guard: self, + current_index: 0, } } } +impl<'a, T> Iterator for StreamReaderGuardIter<'a, T> { + type Item = &'a T; + + fn next(&mut self) -> Option { + if self.current_index >= self.guard.available { + return None; + } + + let x = Some(self.guard.index(self.current_index)); + self.current_index += 1; + x + } +} + impl<'a, T> Drop for StreamReaderGuard<'a, T> { fn drop(&mut self) { *self.inner.inner.swap_rdy.lock().unwrap() = true; diff --git a/src/stream_operators.rs b/src/stream_operators.rs index 3765938..82008a3 100644 --- a/src/stream_operators.rs +++ b/src/stream_operators.rs @@ -1,16 +1,28 @@ -use std::ops::Add; +use std::{fmt::write, ops::Add}; use crate::stream::{StreamReader, stream}; -struct StreamAdder { +pub struct StreamAdder { _phantom: std::marker::PhantomData, } -impl + 'static> StreamAdder { - pub fn new(inputs: Vec>) -> (Self, StreamReader) { - let (mut writer, reader) = stream::(1024); +impl + Copy + 'static> StreamAdder { + pub fn new(input_a: StreamReader, input_b: StreamReader) -> (Self, StreamReader) { + let (writer, reader) = stream::(1024); std::thread::spawn(move || { - let readers = inputs.iter().map(|r| r.read()).collect::>(); + let mut writer = writer; + + loop { + let reader_a = input_a.read(); + let reader_b = input_b.read(); + let mut write = writer.write(); + let len = reader_a.len().min(reader_b.len()).min(write.len()); + for i in 0..len { + write[i].write(reader_a[i] + reader_b[i]); + } + + write.swap(len); + } }); ( Self {