Working base

This commit is contained in:
2026-03-13 21:57:10 +01:00
parent 092fb0191f
commit 866a5dd501
23 changed files with 1050 additions and 321 deletions

View File

@ -4,4 +4,4 @@ version = "0.1.0"
edition = "2024"
[dependencies]
oxydsp-flowgraph-macros = { path = "../oxydsp-flowgraph-macros" }
oxydsp-flowgraph-macros = { path = "./oxydsp-flowgraph-macros" }

View File

@ -0,0 +1,10 @@
[package]
name = "oxydsp-flowgraph-macros"
version = "0.1.0"
edition = "2024"
[lib]
proc-macro = true
[dependencies]
zyn = {version = "0.5", features = ["ext", "pretty"]}

View File

@ -0,0 +1,492 @@
use proc_macro::TokenStream;
use zyn::ToTokens;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::format_ident;
use zyn::parse_input;
use zyn::syn::Index;
use zyn::syn::Lit;
use zyn::syn::spanned::Spanned;
#[zyn::derive("BlockIO", attributes(input, output), debug = "pretty")]
pub fn block_io(
#[zyn(input)] ident: zyn::Extract<zyn::syn::Ident>,
#[zyn(input)] generics: zyn::Extract<zyn::syn::Generics>,
#[zyn(input)] fields: zyn::Fields,
) -> zyn::TokenStream
{
let ident = ident.inner();
let (impl_generics, type_generics, where_clause) = generics.split_for_impl();
zyn::zyn!(
impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.clone() }} {{ type_generics }}
{{ where_clause }}
{
@block_io_set_index(fields = fields.clone())
@block_io_get_successors(fields = fields.clone())
@block_io_counts(fields = fields.clone())
@block_io_set_streams(fields = fields.clone())
@block_io_create_stream(fields = fields.clone())
@block_io_get_meta(ident = ident.clone(), fields = fields.clone())
}
)
}
#[zyn::element]
fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn set_index(&self, block_index: usize)
{
use oxydsp_flowgraph::edge::BlockIOIndex;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
}
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
}
}
)
}
#[zyn::element]
fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn get_successors(&self) -> Vec<oxydsp_flowgraph::edge::BlockIOIndex>
{
let mut output = vec![];
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block()
{
output.push(block_index);
}
}
output
}
)
}
#[zyn::element]
fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn get_block_name(&self) -> &'static str
{
return {{ ident.to_string() }};
}
fn get_input_names(&self) -> Vec<&'static str>
{
let mut output = Vec::new();
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
output.push({{ field.1.ident.clone().unwrap().to_string() }});
}
return output;
}
fn get_output_names(&self) -> Vec<&'static str>
{
let mut output = Vec::new();
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
output.push({{ field.1.ident.clone().unwrap().to_string() }});
}
return output;
}
fn get_output_type_names(&self) -> Vec<&'static str>
{
let mut output = Vec::new();
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
output.push(self.{{ field.1.ident.clone() }}.get_type_name());
}
return output;
}
)
}
#[zyn::element]
fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
let input_count = fields
.iter()
.filter(|x| x.attrs.iter().any(|x| x.is("input")))
.count();
let output_count = fields
.iter()
.filter(|x| x.attrs.iter().any(|x| x.is("output")))
.count();
zyn::zyn!(
fn input_count(&self) -> usize
{
return { { input_count } };
}
fn output_count(&self) -> usize
{
return { { output_count } };
}
)
}
#[zyn::element(debug = "pretty")]
fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
{
zyn::zyn!(
#[allow(unreachable_code)]
fn set_anonymous_out_stream(
&mut self,
output_index: usize,
producer: oxydsp_flowgraph::edge::AnonymousStreamProducer,
)
{
match output_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(producer),
}
_ => panic!("output_index out of bounds.")
};
}
#[allow(unreachable_code)]
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer)
{
match input_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(consumer),
}
_ => panic!("output_index out of bounds.")
};
}
)
}
#[zyn::element]
fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream
{
zyn::zyn!(
#[allow(unreachable_code)]
fn create_anonymous_stream_for(
&mut self,
output_index: usize,
capacity: usize
) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
{
let (tx, rx): (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
= match output_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.0 }} =>
{
let (tx, rx) = oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity);
(tx.into(), rx.into())
},
}
_ => panic!("output_index out of bounds.")
};
(tx, rx)
}
)
}
#[zyn::element]
fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream
{
let out_ty = match ty
{
zyn::syn::Type::Path(type_path) => match &type_path.path.segments.last().unwrap().arguments
{
zyn::syn::PathArguments::AngleBracketed(args) => match args.args.first().unwrap()
{
zyn::syn::GenericArgument::Type(x) => Some(x.clone().to_token_stream()),
_ => None,
},
_ => None,
},
_ => None,
};
if out_ty.is_none()
{
bail!("Output type must be a Out<T> type."; span = ty.span());
}
out_ty.unwrap()
}
// Sync block
#[zyn::attribute]
pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream
{
let mut strcut_item = item.clone();
let (impl_generics, type_generics, where_clause) = item.generics.split_for_impl();
let fields = &item.fields.as_named().unwrap().named;
// Get state fields
let mut state_fields = vec![];
for field in strcut_item.fields.iter_mut()
{
let attr_index = field.attrs.iter().enumerate().find_map(|(i, attr)| {
if attr.is("sync_state") { Some(i) } else { None }
});
if let Some(state_field) = &field.ident
&& let Some(attr_index) = attr_index
{
state_fields.push(state_field.clone());
field.attrs.remove(attr_index);
}
}
zyn::zyn!(
{{ strcut_item }}
impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ strcut_item.ident }} {{ type_generics }}
where {{ where_clause }}
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let mut len = usize::MAX;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
let mut {{ field.1.ident.clone().unwrap() | ident:"{}_reader" }} = self.{{field.1.ident}}.read();
len = len.min({{ field.1.ident.clone().unwrap() | ident:"{}_reader" }}.len());
}
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
let mut {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }} = self.{{field.1.ident}}.write();
len = len.min({{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.len());
}
for _ in 0..len
{
if let Some((
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.1.ident.clone().unwrap() | ident: "{}_out" }},
}
)) = Self::sync_work(
(
@for (state_field in state_fields)
{
&mut self.{{ state_field }},
}
),
(
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
{{ field.1.ident.clone().unwrap() | ident: "{}_reader" }}.pop().unwrap(),
}
)
)
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
let _ = {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.push({{ field.1.ident.clone().unwrap() | ident: "{}_out" }});
}
} else
{
return oxydsp_flowgraph::block::BlockResult::Terminated;
}
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}
)
}
// Generate
#[proc_macro]
pub fn generate_pop_iterable_tuple_impl(input: TokenStream) -> TokenStream
{
let count = parse_input!(input as Lit);
let count: usize = match count
{
Lit::Int(lit_int) => lit_int.base10_parse::<usize>().unwrap(),
_ =>
{
return zyn::syn::Error::new(count.span(), "Must be an integer")
.to_compile_error()
.into();
}
};
let generics = [
format_ident!("A"),
format_ident!("B"),
format_ident!("C"),
format_ident!("D"),
format_ident!("E"),
format_ident!("F"),
format_ident!("G"),
format_ident!("H"),
format_ident!("I"),
format_ident!("J"),
format_ident!("K"),
format_ident!("L"),
format_ident!("M"),
format_ident!("N"),
format_ident!("O"),
format_ident!("P"),
format_ident!("Q"),
format_ident!("R"),
format_ident!("S"),
format_ident!("T"),
format_ident!("U"),
format_ident!("V"),
format_ident!("W"),
format_ident!("X"),
format_ident!("Y"),
format_ident!("Z"),
];
zyn::zyn!(
impl<'a,
@for (i in 0..count)
{
{{ generics[i] }}: 'static,
}
> PopIterable<'a> for (
@for (i in 0..count)
{
&mut In<{{ generics[i] }}>,
}
)
{
type Output = (
@for (i in 0..count)
{
StreamReader<'a, {{ generics[i] }}>,
}
);
fn pop_iter(&'a mut self) -> PopIter<Self::Output>
{
@for (i in 0..count)
{
let {{ i | ident:"reader_{}" }} = self.{{ Index::from(i) }}.read();
}
let len = [
@for (i in 0..count)
{
{{ i | ident:"reader_{}" }}.len(),
}
].into_iter().min().unwrap();
PopIter {
popped: 0,
len,
reader: (
@for (i in 0..count)
{
{{ i | ident:"reader_{}" }},
}
),
}
}
}
)
.to_token_stream()
.into()
}
#[proc_macro]
pub fn impl_iterator_for_pop_iter_tuple(input: TokenStream) -> TokenStream
{
let count = parse_input!(input as Lit);
let count: usize = match count
{
Lit::Int(lit_int) => lit_int.base10_parse::<usize>().unwrap(),
_ =>
{
return zyn::syn::Error::new(count.span(), "Must be an integer")
.to_compile_error()
.into();
}
};
let generics = [
format_ident!("A"),
format_ident!("B"),
format_ident!("C"),
format_ident!("D"),
format_ident!("E"),
format_ident!("F"),
format_ident!("G"),
format_ident!("H"),
format_ident!("I"),
format_ident!("J"),
format_ident!("K"),
format_ident!("L"),
format_ident!("M"),
format_ident!("N"),
format_ident!("O"),
format_ident!("P"),
format_ident!("Q"),
format_ident!("R"),
format_ident!("S"),
format_ident!("T"),
format_ident!("U"),
format_ident!("V"),
format_ident!("W"),
format_ident!("X"),
format_ident!("Y"),
format_ident!("Z"),
];
zyn::zyn!(
impl<'a,
@for (i in 0..count)
{
{{ generics[i] }}: 'static,
}
> Iterator for PopIter<(
@for (i in 0..count)
{
StreamReader<'a, {{ generics[i] }}>,
}
)>
{
type Item = (
@for (i in 0..count)
{
{{ generics[i] }},
}
);
fn next(&mut self) -> Option<Self::Item>
{
if self.popped < self.len
{
self.popped += 1;
Some((
@for (i in 0..count)
{
self.reader.{{ Index::from(i) }}.pop().unwrap(),
}
))
}
else
{
None
}
}
}
)
.to_token_stream()
.into()
}

View File

@ -2,7 +2,12 @@ use crate::edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex
pub enum BlockResult
{
// Signifies that the block can be scheduled again.
Ok,
// Signifies that the block finished its work
// Running it again would be useless
// This triggers the graph shutdown
Terminated,
}
@ -28,6 +33,12 @@ pub trait BlockIO
output_index: usize,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
// Meta information
fn get_block_name(&self) -> &'static str;
fn get_input_names(&self) -> Vec<&'static str>;
fn get_output_names(&self) -> Vec<&'static str>;
fn get_output_type_names(&self) -> Vec<&'static str>;
}
pub trait Block
@ -39,8 +50,11 @@ pub trait SyncBlock
{
type Input;
type Output;
type State;
fn sync_work(&mut self, input: Self::Input) -> Self::Output;
fn sync_work(state: &mut Self::State, input: Self::Input) -> Option<Self::Output>;
}
pub trait GraphableBlock: Block + BlockIO {}
impl<T> GraphableBlock for T where T: Block + BlockIO {}

View File

@ -1,13 +1,18 @@
use std::any::Any;
use std::collections::binary_heap::Iter;
use std::sync::Arc;
use std::sync::Mutex;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::stream;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
#[derive(Default)]
pub struct Edge
{
// Represents the index of the block owning the Out end in the graph
@ -90,6 +95,18 @@ pub struct Out<T>
edge: Arc<Mutex<Edge>>,
}
pub fn stream<T>() -> (Out<T>, In<T>)
{
let edge = Arc::new(Mutex::new(Edge::default()));
(
Out {
stream: None,
edge: edge.clone(),
},
In { stream: None, edge },
)
}
impl<T: 'static> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
@ -145,4 +162,106 @@ impl<T: 'static> Out<T>
{
self.stream.as_mut().unwrap().write()
}
pub fn push_iter<I: Iterator<Item = T>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
for _ in 0..len
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
}
else
{
return false;
}
}
true
}
// Meta information
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}
pub struct PopIter<T>
{
len: usize,
popped: usize,
reader: T,
}
pub trait PopIterable<'a>
{
type Output;
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
impl<'a, T: 'static> PopIterable<'a> for In<T>
{
type Output = StreamReader<'a, T>;
fn pop_iter(&'a mut self) -> PopIter<StreamReader<'a, T>>
{
let reader = self.read();
PopIter {
popped: 0,
len: reader.len(),
reader,
}
}
}
generate_pop_iterable_tuple_impl! {2}
generate_pop_iterable_tuple_impl! {3}
generate_pop_iterable_tuple_impl! {4}
generate_pop_iterable_tuple_impl! {5}
generate_pop_iterable_tuple_impl! {6}
generate_pop_iterable_tuple_impl! {7}
generate_pop_iterable_tuple_impl! {8}
generate_pop_iterable_tuple_impl! {9}
generate_pop_iterable_tuple_impl! {10}
generate_pop_iterable_tuple_impl! {11}
generate_pop_iterable_tuple_impl! {12}
generate_pop_iterable_tuple_impl! {13}
generate_pop_iterable_tuple_impl! {14}
generate_pop_iterable_tuple_impl! {15}
generate_pop_iterable_tuple_impl! {16}
generate_pop_iterable_tuple_impl! {17}
generate_pop_iterable_tuple_impl! {18}
generate_pop_iterable_tuple_impl! {19}
generate_pop_iterable_tuple_impl! {20}
impl<'a, T> Iterator for PopIter<StreamReader<'a, T>>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item>
{
self.reader.pop()
}
}
impl_iterator_for_pop_iter_tuple! {2}
impl_iterator_for_pop_iter_tuple! {3}
impl_iterator_for_pop_iter_tuple! {4}
impl_iterator_for_pop_iter_tuple! {5}
impl_iterator_for_pop_iter_tuple! {6}
impl_iterator_for_pop_iter_tuple! {7}
impl_iterator_for_pop_iter_tuple! {8}
impl_iterator_for_pop_iter_tuple! {9}
impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
impl_iterator_for_pop_iter_tuple! {13}
impl_iterator_for_pop_iter_tuple! {14}
impl_iterator_for_pop_iter_tuple! {15}
impl_iterator_for_pop_iter_tuple! {16}
impl_iterator_for_pop_iter_tuple! {17}
impl_iterator_for_pop_iter_tuple! {18}
impl_iterator_for_pop_iter_tuple! {19}
impl_iterator_for_pop_iter_tuple! {20}

View File

@ -1,5 +1,20 @@
use crate::block::GraphableBlock;
#[macro_export]
macro_rules! flowgraph
{
($($x:ident),* $(,)?) =>
{
{
let mut flowgraph = FlowGraph::new();
$(
flowgraph.add_block($x);
)*
flowgraph
}
}
}
pub struct FlowGraph
{
blocks: Vec<Box<dyn GraphableBlock + Send + 'static>>,
@ -17,6 +32,130 @@ impl FlowGraph
block.set_index(self.blocks.len());
self.blocks.push(Box::new(block));
}
pub fn run(mut self)
{
self.populate_edges();
let mut k = vec![];
for mut x in self.blocks.into_iter()
{
k.push(std::thread::spawn(move || {
loop
{
x.work();
}
}));
}
k.into_iter().for_each(|j| {
let _ = j.join();
});
}
fn populate_edges(&mut self)
{
for block_index in 0..self.blocks.len()
{
let successors = self.blocks[block_index].get_successors();
for (output_index, succ_id) in successors.iter().enumerate()
{
let (tx, rx) =
self.blocks[block_index].create_anonymous_stream_for(output_index, 4096);
self.blocks[block_index].set_anonymous_out_stream(output_index, tx);
self.blocks[succ_id.block_index].set_anonymous_in_stream(succ_id.port_index, rx);
}
}
}
pub fn get_dot(&self) -> String
{
let mut node_string = String::new();
for (i, block) in self.blocks.iter().enumerate()
{
// Block name
// Input strings
let mut input_string = String::new();
let inputs = block.get_input_names();
let len = inputs.len();
if !inputs.is_empty()
{
input_string.push('{');
for (j, input) in inputs.iter().enumerate()
{
input_string.push_str(&format!("<i{}> {}", j, input));
if j != len - 1
{
input_string.push('|');
}
}
input_string.push_str("}|");
}
// Output strings
let mut output_string = String::new();
let outputs = block.get_output_names();
let len = outputs.len();
if !outputs.is_empty()
{
output_string.push_str("|{");
for (j, output) in outputs.iter().enumerate()
{
output_string.push_str(&format!("<o{}> {}", j, output));
if j != len - 1
{
output_string.push('|');
}
}
output_string.push('}');
}
node_string.push_str(&format!(
"{}_{} [label=\"{{ {} {} {} }}\"];\n",
block.get_block_name(),
i,
input_string,
block.get_block_name(),
output_string,
));
}
let mut edges_string = String::new();
for (i, block) in self.blocks.iter().enumerate()
{
let outputs = block.get_successors();
let output_types = block.get_output_type_names();
let block_name = block.get_block_name();
for (j, (output, output_type)) in outputs.iter().zip(output_types.iter()).enumerate()
{
let destination_block = output.block_index;
let destination_block_name = self.blocks[destination_block].get_block_name();
edges_string.push_str(&format!(
"{}_{}:o{} -> {}_{}:i{} [label=\"{}\"];\n",
block_name,
i,
j,
destination_block_name,
destination_block,
output.port_index,
output_type
));
}
}
format!(
"
digraph G {{
node [shape=record];
rankdir=TB;
{}
{}
}}
",
node_string, edges_string
)
}
}
impl Default for FlowGraph

View File

@ -5,3 +5,4 @@ pub mod block;
pub mod edge;
pub mod graph;
pub mod stream;
pub use oxydsp_flowgraph_macros::{BlockIO, sync_block};

View File

@ -1,50 +1 @@
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::edge::In;
use oxydsp_flowgraph::edge::Out;
use oxydsp_flowgraph_macros::BlockIO;
use oxydsp_flowgraph_macros::sync_block;
#[derive(BlockIO)]
//#[sync_block]
pub struct Test
{
#[input]
input: In<u32>,
#[output]
output: Out<u32>,
}
impl oxydsp_flowgraph::block::Block for Test
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let mut len = usize::MAX;
let mut input_reader = self.input.read();
len = len.min(input_reader.len());
let mut output_writer = self.output.write();
len = len.min(output_writer.len());
//let input = input_reader.pop().unwrap();
let input = 0;
for _ in 0..len
{
let (output_out,) = self.sync_work((input,));
output_writer.push(output_out).unwrap();
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}
impl SyncBlock for Test
{
type Input = (u32,);
type Output = (u32,);
fn sync_work(&mut self, (num,): Self::Input) -> Self::Output
{
(num,)
}
}
fn main() {}

View File

@ -1,3 +1,4 @@
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ops::Deref;
@ -60,7 +61,7 @@ pub struct StreamWriter<'a, T>
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
first_len: usize,
second_len: usize,
written: usize,
written: Cell<usize>,
}
// Represents a read operation within a stream producer
@ -71,7 +72,7 @@ pub struct StreamReader<'a, T>
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
first_len: usize,
second_len: usize,
read: usize,
read: Cell<usize>,
}
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
@ -158,15 +159,21 @@ impl<T> StreamProducer<T>
// is thus the only on able to write or read the stream when it lives
let first_len = head_to_end.len();
let second_len = start_to_tail.len();
let first = std::mem::transmute(head_to_end);
let second = Some(std::mem::transmute(start_to_tail));
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(head_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
StreamWriter {
producer: self,
first,
second,
first_len,
second_len,
written: 0,
written: 0.into(),
}
}
}
@ -206,15 +213,23 @@ impl<T> StreamProducer<T>
let first_len = head_to_end.len();
let second_len = start_to_tail.len();
let first = std::mem::transmute(head_to_end);
let second = Some(std::mem::transmute(start_to_tail));
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[std::mem::MaybeUninit<T>]>,
>(head_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
StreamWriter {
producer: self,
first,
second,
first_len,
second_len,
written: 0,
written: 0.into(),
}
}
}
@ -245,11 +260,14 @@ impl<T> StreamProducer<T>
let k = &mut *self.inner.buffer.get();
StreamWriter {
producer: self,
first_len: k.len(),
first_len: wrapped_tail - wrapped_head,
second_len: 0,
first: std::mem::transmute(&k[wrapped_head..wrapped_tail]),
first: std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&k[wrapped_head..wrapped_tail]),
second: None,
written: 0,
written: 0.into(),
}
}
}
@ -278,11 +296,13 @@ impl<T> StreamConsumer<T>
let k = &mut *self.inner.buffer.get();
StreamReader {
producer: self,
first_len: k.len(),
first_len: wrapped_head - wrapped_tail,
second_len: 0,
first: std::mem::transmute(&k[wrapped_tail..wrapped_head]),
first: std::mem::transmute::<&[MaybeUninit<T>], &UnsafeCell<[MaybeUninit<T>]>>(
&k[wrapped_tail..wrapped_head],
),
second: None,
read: 0,
read: 0.into(),
}
}
}
@ -309,11 +329,14 @@ impl<T> StreamConsumer<T>
let k = &mut *self.inner.buffer.get();
StreamReader {
producer: self,
first_len: k.len(),
first_len: wrapped_head - wrapped_tail,
second_len: 0,
first: std::mem::transmute(&k[wrapped_tail..wrapped_head]),
first: std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&k[wrapped_tail..wrapped_head]),
second: None,
read: 0,
read: 0.into(),
}
}
}
@ -351,15 +374,23 @@ impl<T> StreamConsumer<T>
let first_len = tail_to_end.len();
let second_len = start_to_head.len();
let first = std::mem::transmute(tail_to_end);
let second = Some(std::mem::transmute(start_to_head));
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(tail_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_head));
StreamReader {
producer: self,
first,
second,
first_len,
second_len,
read: 0,
read: 0.into(),
}
}
}
@ -379,23 +410,24 @@ impl<'a, T> StreamWriter<'a, T>
self.len() == 0
}
pub fn push(&mut self, element: T) -> Result<(), T>
pub fn push(&self, element: T) -> Result<(), T>
{
if self.written < self.first_len
if self.written.get() < self.first_len
{
unsafe {
(&mut *self.first.get())[self.written] = MaybeUninit::new(element);
(&mut *self.first.get())[self.written.get()] = MaybeUninit::new(element);
}
self.written += 1;
self.written.set(self.written.get() + 1);
Ok(())
}
else if let Some(second) = &mut self.second
&& self.written - self.first_len < self.second_len
else if let Some(second) = &self.second
&& self.written.get() - self.first_len < self.second_len
{
unsafe {
(&mut *second.get())[self.written - self.first_len] = MaybeUninit::new(element);
(&mut *second.get())[self.written.get() - self.first_len] =
MaybeUninit::new(element);
}
self.written += 1;
self.written.set(self.written.get() + 1);
Ok(())
}
else
@ -417,9 +449,9 @@ impl<'a, T> StreamReader<'a, T>
self.len() == 0
}
pub fn pop(&mut self) -> Option<T>
pub fn pop(&self) -> Option<T>
{
if self.read < self.first_len
if self.read.get() < self.first_len
{
// SAFETY:
//
@ -427,25 +459,25 @@ impl<'a, T> StreamReader<'a, T>
// We take it once since read increases
let element = unsafe {
std::mem::replace(
&mut (&mut *self.first.get())[self.read],
&mut (&mut *self.first.get())[self.read.get()],
MaybeUninit::uninit(),
)
.assume_init()
};
self.read += 1;
self.read.set(self.read.get() + 1);
Some(element)
}
else if let Some(second) = &mut self.second
&& self.read - self.first_len < self.second_len
else if let Some(second) = &self.second
&& self.read.get() - self.first_len < self.second_len
{
let element = unsafe {
std::mem::replace(
&mut (&mut *second.get())[self.read - self.first_len],
&mut (&mut *second.get())[self.read.get() - self.first_len],
MaybeUninit::uninit(),
)
.assume_init()
};
self.read += 1;
self.read.set(self.read.get() + 1);
Some(element)
}
else
@ -469,7 +501,7 @@ impl<'a, T> Drop for StreamWriter<'a, T>
self.producer
.inner
.head
.store(head + self.written, Ordering::Release);
.store(head + self.written.get(), Ordering::Release);
}
}
@ -487,7 +519,7 @@ impl<'a, T> Drop for StreamReader<'a, T>
self.producer
.inner
.tail
.store(tail + self.read, Ordering::Release);
.store(tail + self.read.get(), Ordering::Release);
}
}
@ -503,7 +535,7 @@ mod test
let (mut tx, mut rx) = bounded_queue::<usize>(4);
{
let mut writer = tx.write();
let writer = tx.write();
assert_eq!(writer.len(), 4);
@ -515,7 +547,7 @@ mod test
}
{
let mut reader = rx.read();
let reader = rx.read();
assert_eq!(reader.len(), 4);
@ -528,7 +560,7 @@ mod test
// Put stream into weird situatino
{
let mut writer = tx.write();
let writer = tx.write();
assert_eq!(writer.push(1), Ok(()));
assert_eq!(writer.push(2), Ok(()));
assert_eq!(writer.push(3), Ok(()));
@ -536,20 +568,20 @@ mod test
}
{
let mut reader = rx.read();
let reader = rx.read();
assert_eq!(reader.pop(), Some(1));
assert_eq!(reader.pop(), Some(2));
}
{
let mut writer = tx.write();
let writer = tx.write();
assert_eq!(writer.len(), 2);
assert_eq!(writer.push(5), Ok(()));
assert_eq!(writer.push(6), Ok(()));
}
{
let mut reader = rx.read();
let reader = rx.read();
assert_eq!(reader.pop(), Some(3));
assert_eq!(reader.pop(), Some(4));
assert_eq!(reader.pop(), Some(5));