Starting BlockSync
This commit is contained in:
@ -1,8 +1,8 @@
|
||||
use oxydsp_flowgraph::{
|
||||
block::SyncBlock,
|
||||
edge::{In, Out},
|
||||
};
|
||||
use oxydsp_flowgraph_macros::{BlockIO, sync_block};
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::edge::In;
|
||||
use oxydsp_flowgraph::edge::Out;
|
||||
use oxydsp_flowgraph_macros::BlockIO;
|
||||
use oxydsp_flowgraph_macros::sync_block;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
//#[sync_block]
|
||||
@ -24,9 +24,10 @@ impl oxydsp_flowgraph::block::Block for Test
|
||||
len = len.min(input_reader.len());
|
||||
let mut output_writer = self.output.write();
|
||||
len = len.min(output_writer.len());
|
||||
//let input = input_reader.pop().unwrap();
|
||||
let input = 0;
|
||||
for _ in 0..len
|
||||
{
|
||||
let input = input_reader.pop().unwrap();
|
||||
let (output_out,) = self.sync_work((input,));
|
||||
output_writer.push(output_out).unwrap();
|
||||
}
|
||||
|
||||
@ -55,9 +55,11 @@ unsafe impl<T> Sync for StreamConsumer<T> {}
|
||||
// Represents a write operation within a stream producer
|
||||
pub struct StreamWriter<'a, T>
|
||||
{
|
||||
producer: &'a StreamProducer<T>,
|
||||
producer: &'a mut StreamProducer<T>,
|
||||
first: &'a UnsafeCell<[MaybeUninit<T>]>,
|
||||
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
|
||||
first_len: usize,
|
||||
second_len: usize,
|
||||
written: usize,
|
||||
}
|
||||
|
||||
@ -65,8 +67,10 @@ pub struct StreamWriter<'a, T>
|
||||
pub struct StreamReader<'a, T>
|
||||
{
|
||||
producer: &'a StreamConsumer<T>,
|
||||
first: &'a mut [MaybeUninit<T>],
|
||||
second: Option<&'a mut [MaybeUninit<T>]>,
|
||||
first: &'a UnsafeCell<[MaybeUninit<T>]>,
|
||||
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
|
||||
first_len: usize,
|
||||
second_len: usize,
|
||||
read: usize,
|
||||
}
|
||||
|
||||
@ -152,12 +156,16 @@ impl<T> StreamProducer<T>
|
||||
// This functions borrows the stream mutably. As such, only one instance
|
||||
// of StreamWriter can exist for a given stream. The StreamWriter
|
||||
// is thus the only on able to write or read the stream when it lives
|
||||
let first = head_to_end;
|
||||
let second = Some(start_to_tail);
|
||||
let first_len = head_to_end.len();
|
||||
let second_len = start_to_tail.len();
|
||||
let first = std::mem::transmute(head_to_end);
|
||||
let second = Some(std::mem::transmute(start_to_tail));
|
||||
StreamWriter {
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
first_len,
|
||||
second_len,
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
@ -196,12 +204,16 @@ impl<T> StreamProducer<T>
|
||||
let (start_to_tail, _tail_to_head) =
|
||||
start_to_head.split_at_mut_unchecked(wrapped_tail);
|
||||
|
||||
let first = head_to_end;
|
||||
let second = Some(start_to_tail);
|
||||
let first_len = head_to_end.len();
|
||||
let second_len = start_to_tail.len();
|
||||
let first = std::mem::transmute(head_to_end);
|
||||
let second = Some(std::mem::transmute(start_to_tail));
|
||||
StreamWriter {
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
first_len,
|
||||
second_len,
|
||||
written: 0,
|
||||
}
|
||||
}
|
||||
@ -233,7 +245,9 @@ impl<T> StreamProducer<T>
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
StreamWriter {
|
||||
producer: self,
|
||||
first: &mut k[wrapped_head..wrapped_tail],
|
||||
first_len: k.len(),
|
||||
second_len: 0,
|
||||
first: std::mem::transmute(&k[wrapped_head..wrapped_tail]),
|
||||
second: None,
|
||||
written: 0,
|
||||
}
|
||||
@ -264,7 +278,9 @@ impl<T> StreamConsumer<T>
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
StreamReader {
|
||||
producer: self,
|
||||
first: &mut k[wrapped_tail..wrapped_head],
|
||||
first_len: k.len(),
|
||||
second_len: 0,
|
||||
first: std::mem::transmute(&k[wrapped_tail..wrapped_head]),
|
||||
second: None,
|
||||
read: 0,
|
||||
}
|
||||
@ -293,7 +309,9 @@ impl<T> StreamConsumer<T>
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
StreamReader {
|
||||
producer: self,
|
||||
first: &mut k[wrapped_tail..wrapped_head],
|
||||
first_len: k.len(),
|
||||
second_len: 0,
|
||||
first: std::mem::transmute(&k[wrapped_tail..wrapped_head]),
|
||||
second: None,
|
||||
read: 0,
|
||||
}
|
||||
@ -331,12 +349,16 @@ impl<T> StreamConsumer<T>
|
||||
let (start_to_head, _head_to_tail) =
|
||||
start_to_tail.split_at_mut_unchecked(wrapped_head);
|
||||
|
||||
let first = tail_to_end;
|
||||
let second = Some(start_to_head);
|
||||
let first_len = tail_to_end.len();
|
||||
let second_len = start_to_head.len();
|
||||
let first = std::mem::transmute(tail_to_end);
|
||||
let second = Some(std::mem::transmute(start_to_head));
|
||||
StreamReader {
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
first_len,
|
||||
second_len,
|
||||
read: 0,
|
||||
}
|
||||
}
|
||||
@ -349,12 +371,7 @@ impl<'a, T> StreamWriter<'a, T>
|
||||
{
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.first.len()
|
||||
+ match &self.second
|
||||
{
|
||||
Some(x) => x.len(),
|
||||
None => 0,
|
||||
}
|
||||
self.first_len + self.second_len
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool
|
||||
@ -362,18 +379,22 @@ impl<'a, T> StreamWriter<'a, T>
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn push(&self, element: T) -> Result<(), T>
|
||||
pub fn push(&mut self, element: T) -> Result<(), T>
|
||||
{
|
||||
if self.written < self.first.len()
|
||||
if self.written < self.first_len
|
||||
{
|
||||
self.first[self.written] = MaybeUninit::new(element);
|
||||
unsafe {
|
||||
(&mut *self.first.get())[self.written] = MaybeUninit::new(element);
|
||||
}
|
||||
self.written += 1;
|
||||
Ok(())
|
||||
}
|
||||
else if let Some(second) = &mut self.second
|
||||
&& self.written - self.first.len() < second.len()
|
||||
&& self.written - self.first_len < self.second_len
|
||||
{
|
||||
second[self.written - self.first.len()] = MaybeUninit::new(element);
|
||||
unsafe {
|
||||
(&mut *second.get())[self.written - self.first_len] = MaybeUninit::new(element);
|
||||
}
|
||||
self.written += 1;
|
||||
Ok(())
|
||||
}
|
||||
@ -388,12 +409,7 @@ impl<'a, T> StreamReader<'a, T>
|
||||
{
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.first.len()
|
||||
+ match &self.second
|
||||
{
|
||||
Some(x) => x.len(),
|
||||
None => 0,
|
||||
}
|
||||
self.first_len + self.second_len
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool
|
||||
@ -403,24 +419,28 @@ impl<'a, T> StreamReader<'a, T>
|
||||
|
||||
pub fn pop(&mut self) -> Option<T>
|
||||
{
|
||||
if self.read < self.first.len()
|
||||
if self.read < self.first_len
|
||||
{
|
||||
// SAFETY:
|
||||
//
|
||||
// If element is in this slice, it is initialized.
|
||||
// We take it once since read increases
|
||||
let element = unsafe {
|
||||
std::mem::replace(&mut self.first[self.read], MaybeUninit::uninit()).assume_init()
|
||||
std::mem::replace(
|
||||
&mut (&mut *self.first.get())[self.read],
|
||||
MaybeUninit::uninit(),
|
||||
)
|
||||
.assume_init()
|
||||
};
|
||||
self.read += 1;
|
||||
Some(element)
|
||||
}
|
||||
else if let Some(second) = &mut self.second
|
||||
&& self.read - self.first.len() < second.len()
|
||||
&& self.read - self.first_len < self.second_len
|
||||
{
|
||||
let element = unsafe {
|
||||
std::mem::replace(
|
||||
&mut second[self.read - self.first.len()],
|
||||
&mut (&mut *second.get())[self.read - self.first_len],
|
||||
MaybeUninit::uninit(),
|
||||
)
|
||||
.assume_init()
|
||||
|
||||
Reference in New Issue
Block a user