From 43ef82ca9c53bdb50f34feba97c133183426b7f6 Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Thu, 23 Oct 2025 21:46:32 +0200 Subject: [PATCH] Adds binary operator --- src/main.rs | 2 +- src/stream.rs | 2 ++ src/stream_operators.rs | 39 +++++++++++++++++++++++++++++++-------- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index f96f5f2..2c03de6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,7 +32,7 @@ fn main() { } }); - let (adder, out) = stream_operators::StreamAdder::new(a_rx, b_rx); + let (_adder, out) = stream_operators::BinaryOperator::new(a_rx, b_rx, |a, b| a + b); { loop { diff --git a/src/stream.rs b/src/stream.rs index 5d52967..ff09eaf 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -201,6 +201,8 @@ impl<'a, T: Copy> Iterator for StreamReaderIter<'a, T> { if self.current_guard.is_none() || self.current_index == self.current_guard.as_ref().unwrap().len() { + // Previous guard needs to be explicitly dropped otherwise the "read" call will + // happen before the lock is dropped if self.current_guard.is_some() { let old = self.current_guard.take(); drop(old); diff --git a/src/stream_operators.rs b/src/stream_operators.rs index e693efc..be98a3a 100644 --- a/src/stream_operators.rs +++ b/src/stream_operators.rs @@ -1,23 +1,26 @@ use std::ops::Add; -use crate::stream::{StreamReader, stream}; +use crate::stream::{StreamReader, StreamWriter, stream}; -pub struct StreamAdder { - _phantom: std::marker::PhantomData, +pub struct BinaryOperator { + _phantom: std::marker::PhantomData<(Ia, Ib, O)>, } -impl + Copy + 'static> StreamAdder { - pub fn new(input_a: StreamReader, input_b: StreamReader) -> (Self, StreamReader) { - let (mut writer, reader) = stream::(1024); +impl BinaryOperator { + pub fn new( + input_a: StreamReader, + input_b: StreamReader, + oper: fn(Ia, Ib) -> O, + ) -> (Self, StreamReader) { + let (mut writer, reader) = stream::(1024); std::thread::spawn(move || { - //let mut writer = writer; let mut iter1 = input_a.iter(); let mut iter2 = input_b.iter(); loop { let mut write = writer.write(); let len = write.len(); for i in 0..len { - write[i].write(iter1.next().unwrap() + iter2.next().unwrap()); + write[i].write(oper(iter1.next().unwrap(), iter2.next().unwrap())); } write.swap(len); } @@ -30,3 +33,23 @@ impl + Copy + 'static> StreamAdder { ) } } + +pub struct StreamAdder { + _bin_op: BinaryOperator, + _phantom: std::marker::PhantomData<(Ia, Ib, O)>, +} + +impl + Copy + 'static, Ib: Copy + 'static, O: 'static> + StreamAdder +{ + pub fn new(input_a: StreamReader, input_b: StreamReader) -> (Self, StreamReader) { + let (_bin_op, reader) = BinaryOperator::new(input_a, input_b, |a, b| a + b); + ( + Self { + _phantom: Default::default(), + _bin_op, + }, + reader, + ) + } +}