From d0773783dcc640077904f76b7120a8dad07bfc77 Mon Sep 17 00:00:00 2001 From: Albin Chaboissier Date: Wed, 11 Mar 2026 13:33:21 +0100 Subject: [PATCH] Streams --- .gitignore | 1 + Cargo.lock | 11 + oxydsp-flowgraph/src/lib.rs | 4 + oxydsp-flowgraph/src/main.rs | 71 ++++- oxydsp-flowgraph/src/stream.rs | 539 +++++++++++++++++++++++++++++++++ 5 files changed, 625 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 oxydsp-flowgraph/src/lib.rs create mode 100644 oxydsp-flowgraph/src/stream.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..2ccf4e4 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,11 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "example" +version = "0.1.0" + +[[package]] +name = "oxydsp-flowgraph" +version = "0.1.0" diff --git a/oxydsp-flowgraph/src/lib.rs b/oxydsp-flowgraph/src/lib.rs new file mode 100644 index 0000000..c2134db --- /dev/null +++ b/oxydsp-flowgraph/src/lib.rs @@ -0,0 +1,4 @@ +// This crate manages the flowgraph datastructures and execution/scheduling +// as well as the communication between the blocks + +pub mod stream; diff --git a/oxydsp-flowgraph/src/main.rs b/oxydsp-flowgraph/src/main.rs index a4f2e4f..2030592 100644 --- a/oxydsp-flowgraph/src/main.rs +++ b/oxydsp-flowgraph/src/main.rs @@ -1,4 +1,73 @@ +use std::time::Instant; + +use oxydsp_flowgraph::stream; + fn main() { - println!("Hello, world!"); + transfer_test(); + return; + let (mut tx, mut rx) = stream::bounded_queue::(256); + + std::thread::spawn(move || { + let mut i = 0; + loop + { + let mut writer = tx.write(); + while writer.push(i).is_ok() + { + i += 1; + std::thread::yield_now(); + } + } + }); + + loop + { + let mut reader = rx.read(); + let len = reader.len(); + while let Some(x) = reader.pop() + { + println!("{len}: {x}"); + std::thread::yield_now(); + } + } +} + +fn transfer_test() +{ + let (mut tx, mut rx) = stream::bounded_queue::(256); + let count = 1_000_000_000; + + let start = Instant::now(); + std::thread::spawn(move || { + let mut i = 0; + while i <= count + { + let mut writer = tx.write(); + let mut batch_size = 0; + while i <= count && batch_size < 128 && writer.push(i).is_ok() + { + i += 1; + batch_size += 1; + } + } + }); + + let mut j = 0; + while j <= count + { + let mut reader = rx.read(); + while let Some(x) = reader.pop() + { + assert_eq!(x, j); + j += 1; + } + } + let end = Instant::now(); + let time = (end - start).as_secs_f32(); + println!( + "Transfer test: {:.2}s, {:.2} MT/s", + time, + count as f32 / (1_000_000. * time) + ); } diff --git a/oxydsp-flowgraph/src/stream.rs b/oxydsp-flowgraph/src/stream.rs new file mode 100644 index 0000000..daaf110 --- /dev/null +++ b/oxydsp-flowgraph/src/stream.rs @@ -0,0 +1,539 @@ +use std::cell::UnsafeCell; +use std::mem::MaybeUninit; +use std::ops::Deref; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +// Represents a single producer, single consumer queue +pub struct Stream +{ + buffer: Box]>>, + capacity_mask: usize, + + head: CachePadded, + tail: CachePadded, +} + +#[repr(align(64))] +pub struct CachePadded(T); + +impl CachePadded +{ + pub fn new(element: T) -> Self + { + CachePadded(element) + } +} + +impl Deref for CachePadded +{ + type Target = T; + + fn deref(&self) -> &Self::Target + { + &self.0 + } +} + +pub struct StreamProducer +{ + inner: Arc>, +} + +pub struct StreamConsumer +{ + inner: Arc>, +} + +unsafe impl Send for StreamProducer {} +unsafe impl Sync for StreamProducer {} + +unsafe impl Send for StreamConsumer {} +unsafe impl Sync for StreamConsumer {} + +// Represents a write operation within a stream producer +pub struct StreamWriter<'a, T> +{ + producer: &'a StreamProducer, + first: &'a mut [MaybeUninit], + second: Option<&'a mut [MaybeUninit]>, + written: usize, +} + +// Represents a read operation within a stream producer +pub struct StreamReader<'a, T> +{ + producer: &'a StreamConsumer, + first: &'a mut [MaybeUninit], + second: Option<&'a mut [MaybeUninit]>, + read: usize, +} + +pub fn bounded_queue(capacity: usize) -> (StreamProducer, StreamConsumer) +{ + // Require stream capacity to be a power of two for fast modulo computation + assert!( + capacity.count_ones() == 1, + "Stream capacity must be a power of 2." + ); + let slice = (0..capacity) + .map(|_| MaybeUninit::uninit()) + .collect::>() + .into_boxed_slice(); + let cell = UnsafeCell::from_mut(Box::leak(slice)); + + // SAFETY: + // Cell was created in a box, and UnsafeCell is a ZST, + // the memory layout pointed to by cell is the one a box would thus + // expect + let buffer = unsafe { Box::from_raw(cell as *mut UnsafeCell<[MaybeUninit]>) }; + + // DBG + // let buffer = unsafe { + // Box::from_raw(std::mem::transmute( + // cell as *mut UnsafeCell<[MaybeUninit]>, + // )) + // }; + // DBG + + let queue = Arc::new(Stream { + buffer, + capacity_mask: capacity - 1, + head: CachePadded::new(0.into()), + tail: CachePadded::new(0.into()), + }); + + ( + StreamProducer { + inner: queue.clone(), + }, + StreamConsumer { + inner: queue.clone(), + }, + ) +} + +impl StreamProducer +{ + pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T> + { + // We need to claim the maximum amount of elements. + let tail = self.inner.tail.load(Ordering::Acquire); + let head = self.inner.head.load(Ordering::Relaxed); + + let wrapped_tail = tail & self.inner.capacity_mask; + let wrapped_head = head & self.inner.capacity_mask; + // While we have this information, the back can progress, but never in an unsafe way + // Since it can only free up spots + if head == tail + { + // Current configuration + // + // ▯▯▯▯▯▯▯▯▯▯▯▯▯ + // | + // tail & head + // (empty) + // ______.______ + // slice2 slice1 + + // SAFETY: + // + // Since the StreamWriter object borrows the producer + // we will return the only, mutually exclusive, mutable refences to the empty part of the buffer + // + // We know as per the previous diagram + // That both slices are mutually exclusive + // and that tail and head are indices of the slice + unsafe { + let k = &mut *self.inner.buffer.get(); + + let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head); + let (start_to_tail, _tail_to_head) = + start_to_head.split_at_mut_unchecked(wrapped_tail); + + let first = head_to_end; + let second = Some(start_to_tail); + StreamWriter { + producer: self, + first, + second, + written: 0, + } + } + } + else + { + // We MUST have : tail < head + + if wrapped_tail < wrapped_head + { + // + // Or + // ▯▯▯▯▯▯▯▯▯▯▯▯▯ + // | + // tail & head + // (empty) + // Current configuration : + // ▯▯▯▮▮▮▮▮▮▯▯▯▯ + // | | + // tail head + // ___ ____ + // slice1 slice2 + + // SAFETY: + // + // Since the StreamWriter object borrows the producer + // we will return the only, mutually exclusive, mutable refences to the empty part of the buffer + // + // We know as per the previous diagram + // That both slices are mutually exclusive + // and that tail and head are indices of the slice + unsafe { + let k = &mut *self.inner.buffer.get(); + + let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head); + let (start_to_tail, _tail_to_head) = + start_to_head.split_at_mut_unchecked(wrapped_tail); + + let first = head_to_end; + let second = Some(start_to_tail); + StreamWriter { + producer: self, + first, + second, + written: 0, + } + } + } + else + { + // Current configuration : + // ▮▮▮▯▯▯▯▯▯▮▮▮▮ + // | | + // head tail + // _____ + // slice1 + // + // Or + // ▮▮▮▮▮▮▮▮▮▮▮▮▮ + // | + // tail & head + // (full) + // ______.______ + // slice2 slice1 + + // SAFETY: + // + // Since the StreamWriter object borrows the producer + // we will return the only mutable refence to the empty part of the buffer + // + // Head and tail are both indices of the slice + unsafe { + let k = &mut *self.inner.buffer.get(); + StreamWriter { + producer: self, + first: &mut k[wrapped_head..wrapped_tail], + second: None, + written: 0, + } + } + } + } + } +} + +impl StreamConsumer +{ + pub fn read<'a>(&'a mut self) -> StreamReader<'a, T> + { + // We need to claim the maximum amount of elements. + let head = self.inner.head.load(Ordering::Acquire); + let tail = self.inner.tail.load(Ordering::Relaxed); + + let wrapped_tail = tail & self.inner.capacity_mask; + let wrapped_head = head & self.inner.capacity_mask; + + // While we have this information, the back can progress, but never in an unsafe way + // Since it can only free up spots + + if tail == head + { + // Buffer is empty. Return empty slice + unsafe { + let k = &mut *self.inner.buffer.get(); + StreamReader { + producer: self, + first: &mut k[wrapped_tail..wrapped_head], + second: None, + read: 0, + } + } + } + else + { + // Necessarly: wrapped_tail < wrapped_head + // Two cases : The buffer overlaps the wrapping or not + if wrapped_tail < wrapped_head + { + // Current configuration : + // ▯▯▯▮▮▮▮▮▮▯▯▯▯ + // | | + // tail head + // _______ + // slice1 + + // SAFETY: + // + // Since the StreamWriter object borrows the producer + // we will return the only mutable refence to the empty part of the buffer + // + // Head and tail are both indices of the slice + unsafe { + let k = &mut *self.inner.buffer.get(); + StreamReader { + producer: self, + first: &mut k[wrapped_tail..wrapped_head], + second: None, + read: 0, + } + } + } + else + { + // Current configuration : + // ▮▮▮▯▯▯▯▯▯▮▮▮▮ + // | | + // head tail + // ___ ____ + // slice2 slice1 + // + // Or + // ▮▮▮▮▮▮▮▮▮▮▮▮▮ + // | + // tail & head + // (full) + // ______.______ + // slice2 slice1 + + // SAFETY: + // + // Since the StreamWriter object borrows the producer + // we will return the only, mutually exclusive, mutable refences to the empty part of the buffer + // + // We know as per the previous diagram + // That both slices are mutually exclusive + // and that tail and head are indices of the slice + unsafe { + let k = &mut *self.inner.buffer.get(); + + let (start_to_tail, tail_to_end) = k.split_at_mut_unchecked(wrapped_tail); + let (start_to_head, _head_to_tail) = + start_to_tail.split_at_mut_unchecked(wrapped_head); + + let first = tail_to_end; + let second = Some(start_to_head); + StreamReader { + producer: self, + first, + second, + read: 0, + } + } + } + } + } +} + +impl<'a, T> StreamWriter<'a, T> +{ + pub fn len(&self) -> usize + { + self.first.len() + + match &self.second + { + Some(x) => x.len(), + None => 0, + } + } + + pub fn is_empty(&self) -> bool + { + self.len() == 0 + } + + pub fn push(&mut self, element: T) -> Result<(), T> + { + if self.written < self.first.len() + { + self.first[self.written] = MaybeUninit::new(element); + self.written += 1; + Ok(()) + } + else if let Some(second) = &mut self.second + && self.written - self.first.len() < second.len() + { + second[self.written - self.first.len()] = MaybeUninit::new(element); + self.written += 1; + Ok(()) + } + else + { + Err(element) + } + } +} + +impl<'a, T> StreamReader<'a, T> +{ + pub fn len(&self) -> usize + { + self.first.len() + + match &self.second + { + Some(x) => x.len(), + None => 0, + } + } + + pub fn is_empty(&self) -> bool + { + self.len() == 0 + } + + pub fn pop(&mut self) -> Option + { + if self.read < self.first.len() + { + // SAFETY: + // + // If element is in this slice, it is initialized. + // We take it once since read increases + let element = unsafe { + std::mem::replace(&mut self.first[self.read], MaybeUninit::uninit()).assume_init() + }; + self.read += 1; + Some(element) + } + else if let Some(second) = &mut self.second + && self.read - self.first.len() < second.len() + { + let element = unsafe { + std::mem::replace( + &mut second[self.read - self.first.len()], + MaybeUninit::uninit(), + ) + .assume_init() + }; + self.read += 1; + Some(element) + } + else + { + None + } + } +} + +// When a Stream writer goes out of scope, it wrote +// some things into the stream. These things need to be commited to the queue +impl<'a, T> Drop for StreamWriter<'a, T> +{ + fn drop(&mut self) + { + // Advance head. + // We know that this value hasn't changed since this StreamWriter was created + let head = self.producer.inner.head.load(Ordering::Relaxed); + + // We want writes to the buffer to be visible when acquired in the pop side + self.producer + .inner + .head + .store(head + self.written, Ordering::Release); + } +} + +// When a Stream reader goes out of scope, it took +// some things from the stream. These things need to be de-commited to the queue +impl<'a, T> Drop for StreamReader<'a, T> +{ + fn drop(&mut self) + { + // Advance tail. + // We know that this value hasn't changed since this StreamWriter was created + let tail = self.producer.inner.tail.load(Ordering::Relaxed); + + // We want writes to the buffer to be visible when acquired in the push side + self.producer + .inner + .tail + .store(tail + self.read, Ordering::Release); + } +} + +mod test +{ + #[allow(unused_imports)] + use crate::stream::bounded_queue; + + // Test push and pop on single thread + #[test] + pub fn stream_simple_push_pop() + { + let (mut tx, mut rx) = bounded_queue::(4); + + { + let mut writer = tx.write(); + + assert_eq!(writer.len(), 4); + + assert_eq!(writer.push(1), Ok(())); + assert_eq!(writer.push(2), Ok(())); + assert_eq!(writer.push(3), Ok(())); + assert_eq!(writer.push(4), Ok(())); + assert_eq!(writer.push(5), Err(5)); + } + + { + let mut reader = rx.read(); + + assert_eq!(reader.len(), 4); + + assert_eq!(reader.pop(), Some(1)); + assert_eq!(reader.pop(), Some(2)); + assert_eq!(reader.pop(), Some(3)); + assert_eq!(reader.pop(), Some(4)); + assert_eq!(reader.pop(), None); + } + + // Put stream into weird situatino + { + let mut writer = tx.write(); + assert_eq!(writer.push(1), Ok(())); + assert_eq!(writer.push(2), Ok(())); + assert_eq!(writer.push(3), Ok(())); + assert_eq!(writer.push(4), Ok(())); + } + + { + let mut reader = rx.read(); + assert_eq!(reader.pop(), Some(1)); + assert_eq!(reader.pop(), Some(2)); + } + + { + let mut writer = tx.write(); + assert_eq!(writer.len(), 2); + assert_eq!(writer.push(5), Ok(())); + assert_eq!(writer.push(6), Ok(())); + } + + { + let mut reader = rx.read(); + assert_eq!(reader.pop(), Some(3)); + assert_eq!(reader.pop(), Some(4)); + assert_eq!(reader.pop(), Some(5)); + assert_eq!(reader.pop(), Some(6)); + } + } +}