Streams
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
target/
|
||||
11
Cargo.lock
generated
Normal file
11
Cargo.lock
generated
Normal file
@ -0,0 +1,11 @@
|
||||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 4
|
||||
|
||||
[[package]]
|
||||
name = "example"
|
||||
version = "0.1.0"
|
||||
|
||||
[[package]]
|
||||
name = "oxydsp-flowgraph"
|
||||
version = "0.1.0"
|
||||
4
oxydsp-flowgraph/src/lib.rs
Normal file
4
oxydsp-flowgraph/src/lib.rs
Normal file
@ -0,0 +1,4 @@
|
||||
// This crate manages the flowgraph datastructures and execution/scheduling
|
||||
// as well as the communication between the blocks
|
||||
|
||||
pub mod stream;
|
||||
@ -1,4 +1,73 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use oxydsp_flowgraph::stream;
|
||||
|
||||
fn main()
|
||||
{
|
||||
println!("Hello, world!");
|
||||
transfer_test();
|
||||
return;
|
||||
let (mut tx, mut rx) = stream::bounded_queue::<usize>(256);
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut i = 0;
|
||||
loop
|
||||
{
|
||||
let mut writer = tx.write();
|
||||
while writer.push(i).is_ok()
|
||||
{
|
||||
i += 1;
|
||||
std::thread::yield_now();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
loop
|
||||
{
|
||||
let mut reader = rx.read();
|
||||
let len = reader.len();
|
||||
while let Some(x) = reader.pop()
|
||||
{
|
||||
println!("{len}: {x}");
|
||||
std::thread::yield_now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn transfer_test()
|
||||
{
|
||||
let (mut tx, mut rx) = stream::bounded_queue::<usize>(256);
|
||||
let count = 1_000_000_000;
|
||||
|
||||
let start = Instant::now();
|
||||
std::thread::spawn(move || {
|
||||
let mut i = 0;
|
||||
while i <= count
|
||||
{
|
||||
let mut writer = tx.write();
|
||||
let mut batch_size = 0;
|
||||
while i <= count && batch_size < 128 && writer.push(i).is_ok()
|
||||
{
|
||||
i += 1;
|
||||
batch_size += 1;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut j = 0;
|
||||
while j <= count
|
||||
{
|
||||
let mut reader = rx.read();
|
||||
while let Some(x) = reader.pop()
|
||||
{
|
||||
assert_eq!(x, j);
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
let end = Instant::now();
|
||||
let time = (end - start).as_secs_f32();
|
||||
println!(
|
||||
"Transfer test: {:.2}s, {:.2} MT/s",
|
||||
time,
|
||||
count as f32 / (1_000_000. * time)
|
||||
);
|
||||
}
|
||||
|
||||
539
oxydsp-flowgraph/src/stream.rs
Normal file
539
oxydsp-flowgraph/src/stream.rs
Normal file
@ -0,0 +1,539 @@
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
// Represents a single producer, single consumer queue
|
||||
pub struct Stream<T>
|
||||
{
|
||||
buffer: Box<UnsafeCell<[MaybeUninit<T>]>>,
|
||||
capacity_mask: usize,
|
||||
|
||||
head: CachePadded<AtomicUsize>,
|
||||
tail: CachePadded<AtomicUsize>,
|
||||
}
|
||||
|
||||
#[repr(align(64))]
|
||||
pub struct CachePadded<T>(T);
|
||||
|
||||
impl<T> CachePadded<T>
|
||||
{
|
||||
pub fn new(element: T) -> Self
|
||||
{
|
||||
CachePadded(element)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for CachePadded<T>
|
||||
{
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target
|
||||
{
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamProducer<T>
|
||||
{
|
||||
inner: Arc<Stream<T>>,
|
||||
}
|
||||
|
||||
pub struct StreamConsumer<T>
|
||||
{
|
||||
inner: Arc<Stream<T>>,
|
||||
}
|
||||
|
||||
unsafe impl<T: Send> Send for StreamProducer<T> {}
|
||||
unsafe impl<T> Sync for StreamProducer<T> {}
|
||||
|
||||
unsafe impl<T: Send> Send for StreamConsumer<T> {}
|
||||
unsafe impl<T> Sync for StreamConsumer<T> {}
|
||||
|
||||
// Represents a write operation within a stream producer
|
||||
pub struct StreamWriter<'a, T>
|
||||
{
|
||||
producer: &'a StreamProducer<T>,
|
||||
first: &'a mut [MaybeUninit<T>],
|
||||
second: Option<&'a mut [MaybeUninit<T>]>,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
// Represents a read operation within a stream producer
|
||||
pub struct StreamReader<'a, T>
|
||||
{
|
||||
producer: &'a StreamConsumer<T>,
|
||||
first: &'a mut [MaybeUninit<T>],
|
||||
second: Option<&'a mut [MaybeUninit<T>]>,
|
||||
read: usize,
|
||||
}
|
||||
|
||||
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
|
||||
{
|
||||
// Require stream capacity to be a power of two for fast modulo computation
|
||||
assert!(
|
||||
capacity.count_ones() == 1,
|
||||
"Stream capacity must be a power of 2."
|
||||
);
|
||||
let slice = (0..capacity)
|
||||
.map(|_| MaybeUninit::uninit())
|
||||
.collect::<Vec<_>>()
|
||||
.into_boxed_slice();
|
||||
let cell = UnsafeCell::from_mut(Box::leak(slice));
|
||||
|
||||
// SAFETY:
|
||||
// Cell was created in a box, and UnsafeCell is a ZST,
|
||||
// the memory layout pointed to by cell is the one a box would thus
|
||||
// expect
|
||||
let buffer = unsafe { Box::from_raw(cell as *mut UnsafeCell<[MaybeUninit<T>]>) };
|
||||
|
||||
// DBG
|
||||
// let buffer = unsafe {
|
||||
// Box::from_raw(std::mem::transmute(
|
||||
// cell as *mut UnsafeCell<[MaybeUninit<usize>]>,
|
||||
// ))
|
||||
// };
|
||||
// DBG
|
||||
|
||||
let queue = Arc::new(Stream {
|
||||
buffer,
|
||||
capacity_mask: capacity - 1,
|
||||
head: CachePadded::new(0.into()),
|
||||
tail: CachePadded::new(0.into()),
|
||||
});
|
||||
|
||||
(
|
||||
StreamProducer {
|
||||
inner: queue.clone(),
|
||||
},
|
||||
StreamConsumer {
|
||||
inner: queue.clone(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
impl<T> StreamProducer<T>
|
||||
{
|
||||
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
|
||||
{
|
||||
// We need to claim the maximum amount of elements.
|
||||
let tail = self.inner.tail.load(Ordering::Acquire);
|
||||
let head = self.inner.head.load(Ordering::Relaxed);
|
||||
|
||||
let wrapped_tail = tail & self.inner.capacity_mask;
|
||||
let wrapped_head = head & self.inner.capacity_mask;
|
||||
// While we have this information, the back can progress, but never in an unsafe way
|
||||
// Since it can only free up spots
|
||||
if head == tail
|
||||
{
|
||||
// Current configuration
|
||||
//
|
||||
// ▯▯▯▯▯▯▯▯▯▯▯▯▯
|
||||
// |
|
||||
// tail & head
|
||||
// (empty)
|
||||
// ______.______
|
||||
// slice2 slice1
|
||||
|
||||
// SAFETY:
|
||||
//
|
||||
// Since the StreamWriter object borrows the producer
|
||||
// we will return the only, mutually exclusive, mutable refences to the empty part of the buffer
|
||||
//
|
||||
// We know as per the previous diagram
|
||||
// That both slices are mutually exclusive
|
||||
// and that tail and head are indices of the slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
|
||||
let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head);
|
||||
let (start_to_tail, _tail_to_head) =
|
||||
start_to_head.split_at_mut_unchecked(wrapped_tail);
|
||||
|
||||
let first = head_to_end;
|
||||
let second = Some(start_to_tail);
|
||||
StreamWriter {
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// We MUST have : tail < head
|
||||
|
||||
if wrapped_tail < wrapped_head
|
||||
{
|
||||
//
|
||||
// Or
|
||||
// ▯▯▯▯▯▯▯▯▯▯▯▯▯
|
||||
// |
|
||||
// tail & head
|
||||
// (empty)
|
||||
// Current configuration :
|
||||
// ▯▯▯▮▮▮▮▮▮▯▯▯▯
|
||||
// | |
|
||||
// tail head
|
||||
// ___ ____
|
||||
// slice1 slice2
|
||||
|
||||
// SAFETY:
|
||||
//
|
||||
// Since the StreamWriter object borrows the producer
|
||||
// we will return the only, mutually exclusive, mutable refences to the empty part of the buffer
|
||||
//
|
||||
// We know as per the previous diagram
|
||||
// That both slices are mutually exclusive
|
||||
// and that tail and head are indices of the slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
|
||||
let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head);
|
||||
let (start_to_tail, _tail_to_head) =
|
||||
start_to_head.split_at_mut_unchecked(wrapped_tail);
|
||||
|
||||
let first = head_to_end;
|
||||
let second = Some(start_to_tail);
|
||||
StreamWriter {
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Current configuration :
|
||||
// ▮▮▮▯▯▯▯▯▯▮▮▮▮
|
||||
// | |
|
||||
// head tail
|
||||
// _____
|
||||
// slice1
|
||||
//
|
||||
// Or
|
||||
// ▮▮▮▮▮▮▮▮▮▮▮▮▮
|
||||
// |
|
||||
// tail & head
|
||||
// (full)
|
||||
// ______.______
|
||||
// slice2 slice1
|
||||
|
||||
// SAFETY:
|
||||
//
|
||||
// Since the StreamWriter object borrows the producer
|
||||
// we will return the only mutable refence to the empty part of the buffer
|
||||
//
|
||||
// Head and tail are both indices of the slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
StreamWriter {
|
||||
producer: self,
|
||||
first: &mut k[wrapped_head..wrapped_tail],
|
||||
second: None,
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> StreamConsumer<T>
|
||||
{
|
||||
pub fn read<'a>(&'a mut self) -> StreamReader<'a, T>
|
||||
{
|
||||
// We need to claim the maximum amount of elements.
|
||||
let head = self.inner.head.load(Ordering::Acquire);
|
||||
let tail = self.inner.tail.load(Ordering::Relaxed);
|
||||
|
||||
let wrapped_tail = tail & self.inner.capacity_mask;
|
||||
let wrapped_head = head & self.inner.capacity_mask;
|
||||
|
||||
// While we have this information, the back can progress, but never in an unsafe way
|
||||
// Since it can only free up spots
|
||||
|
||||
if tail == head
|
||||
{
|
||||
// Buffer is empty. Return empty slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
StreamReader {
|
||||
producer: self,
|
||||
first: &mut k[wrapped_tail..wrapped_head],
|
||||
second: None,
|
||||
read: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Necessarly: wrapped_tail < wrapped_head
|
||||
// Two cases : The buffer overlaps the wrapping or not
|
||||
if wrapped_tail < wrapped_head
|
||||
{
|
||||
// Current configuration :
|
||||
// ▯▯▯▮▮▮▮▮▮▯▯▯▯
|
||||
// | |
|
||||
// tail head
|
||||
// _______
|
||||
// slice1
|
||||
|
||||
// SAFETY:
|
||||
//
|
||||
// Since the StreamWriter object borrows the producer
|
||||
// we will return the only mutable refence to the empty part of the buffer
|
||||
//
|
||||
// Head and tail are both indices of the slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
StreamReader {
|
||||
producer: self,
|
||||
first: &mut k[wrapped_tail..wrapped_head],
|
||||
second: None,
|
||||
read: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Current configuration :
|
||||
// ▮▮▮▯▯▯▯▯▯▮▮▮▮
|
||||
// | |
|
||||
// head tail
|
||||
// ___ ____
|
||||
// slice2 slice1
|
||||
//
|
||||
// Or
|
||||
// ▮▮▮▮▮▮▮▮▮▮▮▮▮
|
||||
// |
|
||||
// tail & head
|
||||
// (full)
|
||||
// ______.______
|
||||
// slice2 slice1
|
||||
|
||||
// SAFETY:
|
||||
//
|
||||
// Since the StreamWriter object borrows the producer
|
||||
// we will return the only, mutually exclusive, mutable refences to the empty part of the buffer
|
||||
//
|
||||
// We know as per the previous diagram
|
||||
// That both slices are mutually exclusive
|
||||
// and that tail and head are indices of the slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
|
||||
let (start_to_tail, tail_to_end) = k.split_at_mut_unchecked(wrapped_tail);
|
||||
let (start_to_head, _head_to_tail) =
|
||||
start_to_tail.split_at_mut_unchecked(wrapped_head);
|
||||
|
||||
let first = tail_to_end;
|
||||
let second = Some(start_to_head);
|
||||
StreamReader {
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
read: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> StreamWriter<'a, T>
|
||||
{
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.first.len()
|
||||
+ match &self.second
|
||||
{
|
||||
Some(x) => x.len(),
|
||||
None => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool
|
||||
{
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn push(&mut self, element: T) -> Result<(), T>
|
||||
{
|
||||
if self.written < self.first.len()
|
||||
{
|
||||
self.first[self.written] = MaybeUninit::new(element);
|
||||
self.written += 1;
|
||||
Ok(())
|
||||
}
|
||||
else if let Some(second) = &mut self.second
|
||||
&& self.written - self.first.len() < second.len()
|
||||
{
|
||||
second[self.written - self.first.len()] = MaybeUninit::new(element);
|
||||
self.written += 1;
|
||||
Ok(())
|
||||
}
|
||||
else
|
||||
{
|
||||
Err(element)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> StreamReader<'a, T>
|
||||
{
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.first.len()
|
||||
+ match &self.second
|
||||
{
|
||||
Some(x) => x.len(),
|
||||
None => 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool
|
||||
{
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn pop(&mut self) -> Option<T>
|
||||
{
|
||||
if self.read < self.first.len()
|
||||
{
|
||||
// SAFETY:
|
||||
//
|
||||
// If element is in this slice, it is initialized.
|
||||
// We take it once since read increases
|
||||
let element = unsafe {
|
||||
std::mem::replace(&mut self.first[self.read], MaybeUninit::uninit()).assume_init()
|
||||
};
|
||||
self.read += 1;
|
||||
Some(element)
|
||||
}
|
||||
else if let Some(second) = &mut self.second
|
||||
&& self.read - self.first.len() < second.len()
|
||||
{
|
||||
let element = unsafe {
|
||||
std::mem::replace(
|
||||
&mut second[self.read - self.first.len()],
|
||||
MaybeUninit::uninit(),
|
||||
)
|
||||
.assume_init()
|
||||
};
|
||||
self.read += 1;
|
||||
Some(element)
|
||||
}
|
||||
else
|
||||
{
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When a Stream writer goes out of scope, it wrote
|
||||
// some things into the stream. These things need to be commited to the queue
|
||||
impl<'a, T> Drop for StreamWriter<'a, T>
|
||||
{
|
||||
fn drop(&mut self)
|
||||
{
|
||||
// Advance head.
|
||||
// We know that this value hasn't changed since this StreamWriter was created
|
||||
let head = self.producer.inner.head.load(Ordering::Relaxed);
|
||||
|
||||
// We want writes to the buffer to be visible when acquired in the pop side
|
||||
self.producer
|
||||
.inner
|
||||
.head
|
||||
.store(head + self.written, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
// When a Stream reader goes out of scope, it took
|
||||
// some things from the stream. These things need to be de-commited to the queue
|
||||
impl<'a, T> Drop for StreamReader<'a, T>
|
||||
{
|
||||
fn drop(&mut self)
|
||||
{
|
||||
// Advance tail.
|
||||
// We know that this value hasn't changed since this StreamWriter was created
|
||||
let tail = self.producer.inner.tail.load(Ordering::Relaxed);
|
||||
|
||||
// We want writes to the buffer to be visible when acquired in the push side
|
||||
self.producer
|
||||
.inner
|
||||
.tail
|
||||
.store(tail + self.read, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
mod test
|
||||
{
|
||||
#[allow(unused_imports)]
|
||||
use crate::stream::bounded_queue;
|
||||
|
||||
// Test push and pop on single thread
|
||||
#[test]
|
||||
pub fn stream_simple_push_pop()
|
||||
{
|
||||
let (mut tx, mut rx) = bounded_queue::<usize>(4);
|
||||
|
||||
{
|
||||
let mut writer = tx.write();
|
||||
|
||||
assert_eq!(writer.len(), 4);
|
||||
|
||||
assert_eq!(writer.push(1), Ok(()));
|
||||
assert_eq!(writer.push(2), Ok(()));
|
||||
assert_eq!(writer.push(3), Ok(()));
|
||||
assert_eq!(writer.push(4), Ok(()));
|
||||
assert_eq!(writer.push(5), Err(5));
|
||||
}
|
||||
|
||||
{
|
||||
let mut reader = rx.read();
|
||||
|
||||
assert_eq!(reader.len(), 4);
|
||||
|
||||
assert_eq!(reader.pop(), Some(1));
|
||||
assert_eq!(reader.pop(), Some(2));
|
||||
assert_eq!(reader.pop(), Some(3));
|
||||
assert_eq!(reader.pop(), Some(4));
|
||||
assert_eq!(reader.pop(), None);
|
||||
}
|
||||
|
||||
// Put stream into weird situatino
|
||||
{
|
||||
let mut writer = tx.write();
|
||||
assert_eq!(writer.push(1), Ok(()));
|
||||
assert_eq!(writer.push(2), Ok(()));
|
||||
assert_eq!(writer.push(3), Ok(()));
|
||||
assert_eq!(writer.push(4), Ok(()));
|
||||
}
|
||||
|
||||
{
|
||||
let mut reader = rx.read();
|
||||
assert_eq!(reader.pop(), Some(1));
|
||||
assert_eq!(reader.pop(), Some(2));
|
||||
}
|
||||
|
||||
{
|
||||
let mut writer = tx.write();
|
||||
assert_eq!(writer.len(), 2);
|
||||
assert_eq!(writer.push(5), Ok(()));
|
||||
assert_eq!(writer.push(6), Ok(()));
|
||||
}
|
||||
|
||||
{
|
||||
let mut reader = rx.read();
|
||||
assert_eq!(reader.pop(), Some(3));
|
||||
assert_eq!(reader.pop(), Some(4));
|
||||
assert_eq!(reader.pop(), Some(5));
|
||||
assert_eq!(reader.pop(), Some(6));
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user