Adds stream adder

This commit is contained in:
2025-10-23 15:33:36 +02:00
parent 915d643dfa
commit 6bc7c2ab16
3 changed files with 91 additions and 15 deletions

View File

@ -1,14 +1,16 @@
use std::time::Duration;
use rdsp::stream;
use rdsp::{stream, stream_operators};
fn main() {
let (mut a, b) = stream::stream::<u32>(16);
let (mut a_tx, a_rx) = stream::stream::<u32>(16);
let (mut b_tx, b_rx) = stream::stream::<u32>(16);
std::thread::spawn(move || {
let mut k = 0;
loop {
std::thread::sleep(Duration::from_millis(500));
let mut buf = a.write();
let mut buf = a_tx.write();
for i in 0..16 {
buf[i].write(k);
k += 1;
@ -17,10 +19,25 @@ fn main() {
}
});
std::thread::spawn(move || {
let mut k = 0;
loop {
std::thread::sleep(Duration::from_millis(500));
let mut buf = b_tx.write();
for i in 0..16 {
buf[i].write(k);
k += 1;
}
buf.swap(16);
}
});
let (adder, out) = stream_operators::StreamAdder::new(a_rx, b_rx);
{
loop {
let x = b.read();
for i in 0..16 {
let x = out.read();
for i in 0..x.len() {
println!("{}", x[i]);
}
}

View File

@ -90,6 +90,11 @@ pub struct StreamWriterGuard<'a, T> {
inner: &'a StreamWriter<T>,
}
pub struct StreamWriterGuardIter<'a, T> {
guard: &'a mut StreamWriterGuard<'a, T>,
current_index: usize,
}
impl<'a, T> StreamWriterGuard<'a, T> {
pub fn len(&self) -> usize {
self.inner.inner.buffer_size
@ -102,6 +107,28 @@ impl<'a, T> StreamWriterGuard<'a, T> {
pub fn swap(self, size: usize) {
*self.inner.inner.written.write().unwrap() = size;
}
pub fn iter(&'a mut self) -> StreamWriterGuardIter<'a, T> {
StreamWriterGuardIter {
guard: self,
current_index: 0,
}
}
}
impl<'a, T> Iterator for StreamWriterGuardIter<'a, T> {
type Item = &'a mut MaybeUninit<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.guard.inner.inner.buffer_size {
return None;
}
let x =
unsafe { &mut (&mut *self.guard.inner.inner.write_buffer.get())[self.current_index] };
self.current_index += 1;
Some(x)
}
}
impl<'a, T> Drop for StreamWriterGuard<'a, T> {
@ -119,7 +146,7 @@ impl<'a, T> Drop for StreamWriterGuard<'a, T> {
// Actually swap buffers
unsafe {
std::mem::swap(
std::ptr::swap(
&mut *self.inner.inner.write_buffer.get(),
&mut *self.inner.inner.read_buffer.get(),
);
@ -167,6 +194,11 @@ pub struct StreamReaderGuard<'a, T> {
available: usize,
}
pub struct StreamReaderGuardIter<'a, T> {
guard: &'a StreamReaderGuard<'a, T>,
current_index: usize,
}
impl<'a, T> StreamReaderGuard<'a, T> {
pub fn len(&self) -> usize {
self.available
@ -176,13 +208,28 @@ impl<'a, T> StreamReaderGuard<'a, T> {
self.available == 0
}
pub fn iter(&self) -> Iter<'_, T> {
unsafe {
let x = (*self.inner.inner.read_buffer.get());
pub fn iter(&self) -> StreamReaderGuardIter<'_, T> {
StreamReaderGuardIter {
guard: self,
current_index: 0,
}
}
}
impl<'a, T> Iterator for StreamReaderGuardIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.guard.available {
return None;
}
let x = Some(self.guard.index(self.current_index));
self.current_index += 1;
x
}
}
impl<'a, T> Drop for StreamReaderGuard<'a, T> {
fn drop(&mut self) {
*self.inner.inner.swap_rdy.lock().unwrap() = true;

View File

@ -1,16 +1,28 @@
use std::ops::Add;
use std::{fmt::write, ops::Add};
use crate::stream::{StreamReader, stream};
struct StreamAdder<T> {
pub struct StreamAdder<T> {
_phantom: std::marker::PhantomData<T>,
}
impl<T: Add<Output = T> + 'static> StreamAdder<T> {
pub fn new(inputs: Vec<StreamReader<T>>) -> (Self, StreamReader<T>) {
let (mut writer, reader) = stream::<T>(1024);
impl<T: Add<T, Output = T> + Copy + 'static> StreamAdder<T> {
pub fn new(input_a: StreamReader<T>, input_b: StreamReader<T>) -> (Self, StreamReader<T>) {
let (writer, reader) = stream::<T>(1024);
std::thread::spawn(move || {
let readers = inputs.iter().map(|r| r.read()).collect::<Vec<_>>();
let mut writer = writer;
loop {
let reader_a = input_a.read();
let reader_b = input_b.read();
let mut write = writer.write();
let len = reader_a.len().min(reader_b.len()).min(write.len());
for i in 0..len {
write[i].write(reader_a[i] + reader_b[i]);
}
write.swap(len);
}
});
(
Self {