From 092fb0191f28f177c01efcc18eb552933a67a072 Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Thu, 12 Mar 2026 21:17:38 +0100 Subject: [PATCH] Starting BlockSync --- oxydsp-flowgraph/src/main.rs | 13 ++--- oxydsp-flowgraph/src/stream.rs | 86 +++++++++++++++++++++------------- 2 files changed, 60 insertions(+), 39 deletions(-) diff --git a/oxydsp-flowgraph/src/main.rs b/oxydsp-flowgraph/src/main.rs index 1c8b0e4..1382a61 100644 --- a/oxydsp-flowgraph/src/main.rs +++ b/oxydsp-flowgraph/src/main.rs @@ -1,8 +1,8 @@ -use oxydsp_flowgraph::{ - block::SyncBlock, - edge::{In, Out}, -}; -use oxydsp_flowgraph_macros::{BlockIO, sync_block}; +use oxydsp_flowgraph::block::SyncBlock; +use oxydsp_flowgraph::edge::In; +use oxydsp_flowgraph::edge::Out; +use oxydsp_flowgraph_macros::BlockIO; +use oxydsp_flowgraph_macros::sync_block; #[derive(BlockIO)] //#[sync_block] @@ -24,9 +24,10 @@ impl oxydsp_flowgraph::block::Block for Test len = len.min(input_reader.len()); let mut output_writer = self.output.write(); len = len.min(output_writer.len()); + //let input = input_reader.pop().unwrap(); + let input = 0; for _ in 0..len { - let input = input_reader.pop().unwrap(); let (output_out,) = self.sync_work((input,)); output_writer.push(output_out).unwrap(); } diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs index e79ace6..fb3be77 100644 --- a/oxydsp-flowgraph/src/stream.rs +++ b/oxydsp-flowgraph/src/stream.rs @@ -55,9 +55,11 @@ unsafe impl Sync for StreamConsumer {} // Represents a write operation within a stream producer pub struct StreamWriter<'a, T> { - producer: &'a StreamProducer, + producer: &'a mut StreamProducer, first: &'a UnsafeCell<[MaybeUninit]>, second: Option<&'a UnsafeCell<[MaybeUninit]>>, + first_len: usize, + second_len: usize, written: usize, } @@ -65,8 +67,10 @@ pub struct StreamWriter<'a, T> pub struct StreamReader<'a, T> { producer: &'a StreamConsumer, - first: &'a mut [MaybeUninit], - second: Option<&'a mut [MaybeUninit]>, + first: &'a UnsafeCell<[MaybeUninit]>, + second: Option<&'a UnsafeCell<[MaybeUninit]>>, + first_len: usize, + second_len: usize, read: usize, } @@ -152,12 +156,16 @@ impl StreamProducer // This functions borrows the stream mutably. As such, only one instance // of StreamWriter can exist for a given stream. The StreamWriter // is thus the only on able to write or read the stream when it lives - let first = head_to_end; - let second = Some(start_to_tail); + let first_len = head_to_end.len(); + let second_len = start_to_tail.len(); + let first = std::mem::transmute(head_to_end); + let second = Some(std::mem::transmute(start_to_tail)); StreamWriter { producer: self, first, second, + first_len, + second_len, written: 0, } } @@ -196,12 +204,16 @@ impl StreamProducer let (start_to_tail, _tail_to_head) = start_to_head.split_at_mut_unchecked(wrapped_tail); - let first = head_to_end; - let second = Some(start_to_tail); + let first_len = head_to_end.len(); + let second_len = start_to_tail.len(); + let first = std::mem::transmute(head_to_end); + let second = Some(std::mem::transmute(start_to_tail)); StreamWriter { producer: self, first, second, + first_len, + second_len, written: 0, } } @@ -233,7 +245,9 @@ impl StreamProducer let k = &mut *self.inner.buffer.get(); StreamWriter { producer: self, - first: &mut k[wrapped_head..wrapped_tail], + first_len: k.len(), + second_len: 0, + first: std::mem::transmute(&k[wrapped_head..wrapped_tail]), second: None, written: 0, } @@ -264,7 +278,9 @@ impl StreamConsumer let k = &mut *self.inner.buffer.get(); StreamReader { producer: self, - first: &mut k[wrapped_tail..wrapped_head], + first_len: k.len(), + second_len: 0, + first: std::mem::transmute(&k[wrapped_tail..wrapped_head]), second: None, read: 0, } @@ -293,7 +309,9 @@ impl StreamConsumer let k = &mut *self.inner.buffer.get(); StreamReader { producer: self, - first: &mut k[wrapped_tail..wrapped_head], + first_len: k.len(), + second_len: 0, + first: std::mem::transmute(&k[wrapped_tail..wrapped_head]), second: None, read: 0, } @@ -331,12 +349,16 @@ impl StreamConsumer let (start_to_head, _head_to_tail) = start_to_tail.split_at_mut_unchecked(wrapped_head); - let first = tail_to_end; - let second = Some(start_to_head); + let first_len = tail_to_end.len(); + let second_len = start_to_head.len(); + let first = std::mem::transmute(tail_to_end); + let second = Some(std::mem::transmute(start_to_head)); StreamReader { producer: self, first, second, + first_len, + second_len, read: 0, } } @@ -349,12 +371,7 @@ impl<'a, T> StreamWriter<'a, T> { pub fn len(&self) -> usize { - self.first.len() - + match &self.second - { - Some(x) => x.len(), - None => 0, - } + self.first_len + self.second_len } pub fn is_empty(&self) -> bool @@ -362,18 +379,22 @@ impl<'a, T> StreamWriter<'a, T> self.len() == 0 } - pub fn push(&self, element: T) -> Result<(), T> + pub fn push(&mut self, element: T) -> Result<(), T> { - if self.written < self.first.len() + if self.written < self.first_len { - self.first[self.written] = MaybeUninit::new(element); + unsafe { + (&mut *self.first.get())[self.written] = MaybeUninit::new(element); + } self.written += 1; Ok(()) } else if let Some(second) = &mut self.second - && self.written - self.first.len() < second.len() + && self.written - self.first_len < self.second_len { - second[self.written - self.first.len()] = MaybeUninit::new(element); + unsafe { + (&mut *second.get())[self.written - self.first_len] = MaybeUninit::new(element); + } self.written += 1; Ok(()) } @@ -388,12 +409,7 @@ impl<'a, T> StreamReader<'a, T> { pub fn len(&self) -> usize { - self.first.len() - + match &self.second - { - Some(x) => x.len(), - None => 0, - } + self.first_len + self.second_len } pub fn is_empty(&self) -> bool @@ -403,24 +419,28 @@ impl<'a, T> StreamReader<'a, T> pub fn pop(&mut self) -> Option { - if self.read < self.first.len() + if self.read < self.first_len { // SAFETY: // // If element is in this slice, it is initialized. // We take it once since read increases let element = unsafe { - std::mem::replace(&mut self.first[self.read], MaybeUninit::uninit()).assume_init() + std::mem::replace( + &mut (&mut *self.first.get())[self.read], + MaybeUninit::uninit(), + ) + .assume_init() }; self.read += 1; Some(element) } else if let Some(second) = &mut self.second - && self.read - self.first.len() < second.len() + && self.read - self.first_len < self.second_len { let element = unsafe { std::mem::replace( - &mut second[self.read - self.first.len()], + &mut (&mut *second.get())[self.read - self.first_len], MaybeUninit::uninit(), ) .assume_init()