Adds binary operator

This commit is contained in:
2025-10-23 21:46:32 +02:00
parent e0dc672b58
commit 43ef82ca9c
3 changed files with 34 additions and 9 deletions

View File

@ -32,7 +32,7 @@ fn main() {
}
});
let (adder, out) = stream_operators::StreamAdder::new(a_rx, b_rx);
let (_adder, out) = stream_operators::BinaryOperator::new(a_rx, b_rx, |a, b| a + b);
{
loop {

View File

@ -201,6 +201,8 @@ impl<'a, T: Copy> Iterator for StreamReaderIter<'a, T> {
if self.current_guard.is_none()
|| self.current_index == self.current_guard.as_ref().unwrap().len()
{
// Previous guard needs to be explicitly dropped otherwise the "read" call will
// happen before the lock is dropped
if self.current_guard.is_some() {
let old = self.current_guard.take();
drop(old);

View File

@ -1,23 +1,26 @@
use std::ops::Add;
use crate::stream::{StreamReader, stream};
use crate::stream::{StreamReader, StreamWriter, stream};
pub struct StreamAdder<T> {
_phantom: std::marker::PhantomData<T>,
pub struct BinaryOperator<Ia, Ib, O> {
_phantom: std::marker::PhantomData<(Ia, Ib, O)>,
}
impl<T: Add<T, Output = T> + Copy + 'static> StreamAdder<T> {
pub fn new(input_a: StreamReader<T>, input_b: StreamReader<T>) -> (Self, StreamReader<T>) {
let (mut writer, reader) = stream::<T>(1024);
impl<Ia: Copy + 'static, Ib: Copy + 'static, O: 'static> BinaryOperator<Ia, Ib, O> {
pub fn new(
input_a: StreamReader<Ia>,
input_b: StreamReader<Ib>,
oper: fn(Ia, Ib) -> O,
) -> (Self, StreamReader<O>) {
let (mut writer, reader) = stream::<O>(1024);
std::thread::spawn(move || {
//let mut writer = writer;
let mut iter1 = input_a.iter();
let mut iter2 = input_b.iter();
loop {
let mut write = writer.write();
let len = write.len();
for i in 0..len {
write[i].write(iter1.next().unwrap() + iter2.next().unwrap());
write[i].write(oper(iter1.next().unwrap(), iter2.next().unwrap()));
}
write.swap(len);
}
@ -30,3 +33,23 @@ impl<T: Add<T, Output = T> + Copy + 'static> StreamAdder<T> {
)
}
}
pub struct StreamAdder<Ia, Ib, O> {
_bin_op: BinaryOperator<Ia, Ib, O>,
_phantom: std::marker::PhantomData<(Ia, Ib, O)>,
}
impl<Ia: Add<Ib, Output = O> + Copy + 'static, Ib: Copy + 'static, O: 'static>
StreamAdder<Ia, Ib, O>
{
pub fn new(input_a: StreamReader<Ia>, input_b: StreamReader<Ib>) -> (Self, StreamReader<O>) {
let (_bin_op, reader) = BinaryOperator::new(input_a, input_b, |a, b| a + b);
(
Self {
_phantom: Default::default(),
_bin_op,
},
reader,
)
}
}