Compare commits
2 Commits
10e4509e8c
...
3cdc0e613a
| Author | SHA1 | Date | |
|---|---|---|---|
| 3cdc0e613a | |||
| 520d97726f |
4363
Cargo.lock
generated
4363
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -6,3 +6,8 @@ edition = "2024"
|
||||
[dependencies]
|
||||
oxydsp-flowgraph = {path = "../oxydsp-flowgraph/"}
|
||||
oxydsp-dsp = {path = "../oxydsp-dsp/"}
|
||||
egui = "0.33.3"
|
||||
egui_plot = "0.34.1"
|
||||
eframe = { version = "0.33.3", features = ["default_fonts", "wayland"] }
|
||||
num = "0.4.3"
|
||||
hound = "3.5.1"
|
||||
|
||||
BIN
example/mod.wav
Normal file
BIN
example/mod.wav
Normal file
Binary file not shown.
@ -3,13 +3,19 @@
|
||||
node [shape=record];
|
||||
rankdir=TB;
|
||||
IterSource_0 [label="{ IterSource |{<o0> output} }"];
|
||||
IterSource_1 [label="{ IterSource |{<o0> output} }"];
|
||||
Adder_2 [label="{ {<i0> input_a|<i1> input_b}| Adder |{<o0> output} }"];
|
||||
Printer_3 [label="{ {<i0> input}| Printer }"];
|
||||
Map_1 [label="{ {<i0> input}| Map |{<o0> output} }"];
|
||||
Repeat_2 [label="{ {<i0> input}| Repeat |{<o0> output} }"];
|
||||
Nco_3 [label="{ {<i0> frequency}| Nco |{<o0> output} }"];
|
||||
OscillatorSource_4 [label="{ OscillatorSource |{<o0> output} }"];
|
||||
Multiplier_5 [label="{ {<i0> input_a|<i1> input_b}| Multiplier |{<o0> output} }"];
|
||||
TxSink_6 [label="{ {<i0> input}| TxSink }"];
|
||||
|
||||
IterSource_0:o0 -> Adder_2:i0 [label="usize"];
|
||||
IterSource_1:o0 -> Adder_2:i1 [label="usize"];
|
||||
Adder_2:o0 -> Printer_3:i0 [label="usize"];
|
||||
IterSource_0:o0 -> Map_1:i0 [label="bool"];
|
||||
Map_1:o0 -> Repeat_2:i0 [label="oxydsp_dsp::units::DigitalFrequency"];
|
||||
Repeat_2:o0 -> Nco_3:i0 [label="oxydsp_dsp::units::DigitalFrequency"];
|
||||
Nco_3:o0 -> Multiplier_5:i0 [label="num_complex::Complex<f32>"];
|
||||
OscillatorSource_4:o0 -> Multiplier_5:i1 [label="num_complex::Complex<f32>"];
|
||||
Multiplier_5:o0 -> TxSink_6:i0 [label="num_complex::Complex<f32>"];
|
||||
|
||||
}
|
||||
|
||||
@ -1,24 +1,35 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::Display;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::sync::mpmc::sync_channel;
|
||||
use std::sync::mpsc;
|
||||
|
||||
use oxydsp_dsp::blocks::math::basic::Adder;
|
||||
use eframe::NativeOptions;
|
||||
use egui_plot::Line;
|
||||
use egui_plot::PlotPoints;
|
||||
use num::Complex;
|
||||
use oxydsp_dsp::blocks::math::basic::Multiplier;
|
||||
use oxydsp_dsp::blocks::synthesis::Nco;
|
||||
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Map;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Repeat;
|
||||
use oxydsp_dsp::blocks::utilities::channels::TxSink;
|
||||
use oxydsp_dsp::blocks::utilities::iter::IterSource;
|
||||
use oxydsp_dsp::units::DigitalFrequency;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::edge::In;
|
||||
use oxydsp_flowgraph::edge::PopIter;
|
||||
use oxydsp_flowgraph::edge::PopIterable;
|
||||
use oxydsp_flowgraph::flowgraph;
|
||||
use oxydsp_flowgraph::graph::FlowGraph;
|
||||
use oxydsp_flowgraph::stream::StreamReader;
|
||||
|
||||
//#[derive(BlockIO)]
|
||||
#[derive(BlockIO)]
|
||||
pub struct Printer<T: 'static>
|
||||
{
|
||||
//#[input]
|
||||
input: [In<T>; 3],
|
||||
#[input]
|
||||
input: In<T>,
|
||||
|
||||
n: usize,
|
||||
}
|
||||
@ -27,8 +38,7 @@ impl<T: 'static> Printer<T>
|
||||
{
|
||||
pub fn new(input: In<T>) -> Self
|
||||
{
|
||||
todo!()
|
||||
//Self { input, n: 0 }
|
||||
Self { input, n: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,12 +48,7 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let mut k: Vec<_> = self.input.iter_mut().map(|x| x.read()).collect();
|
||||
|
||||
k[0].pop();
|
||||
k[1].next();
|
||||
|
||||
for x in self.input[0].pop_iter()
|
||||
for x in self.input.pop_iter()
|
||||
{
|
||||
if self.n.is_multiple_of(2usize.pow(20))
|
||||
{
|
||||
@ -58,15 +63,104 @@ where
|
||||
|
||||
fn main()
|
||||
{
|
||||
let (iter_a, a) = IterSource::new(0usize..);
|
||||
let (iter_b, b) = IterSource::new(0usize..);
|
||||
let (adder, added) = Adder::new(a, b);
|
||||
let printer = Printer::new(added);
|
||||
let sample_rate = 48_000;
|
||||
let sample_per_symbol = 96;
|
||||
let deviation = DigitalFrequency::from_time_frequency(500., sample_rate as f64);
|
||||
let carrier = DigitalFrequency::from_time_frequency(1000., sample_rate as f64);
|
||||
|
||||
let graph = flowgraph![iter_a, iter_b, adder, printer];
|
||||
let data = (0..255u8).flat_map(to_bits).collect::<Vec<_>>();
|
||||
let (bit_stream, bits) = IterSource::new(data.into_iter());
|
||||
let (to_freq, freq) = Map::new(bits, move |x| [-deviation, deviation][x as usize]);
|
||||
let (repeat, freq) = Repeat::new(freq, sample_per_symbol);
|
||||
let (base_oscillator, baseband) = Nco::<f32>::new(freq);
|
||||
let (local_oscillator, lo) = OscillatorSource::<f32>::new(carrier.into());
|
||||
let (frontend, passband) = Multiplier::new(baseband, lo);
|
||||
let (tx, rx) = mpsc::channel::<Complex<f32>>();
|
||||
let sink = TxSink::new(passband, tx);
|
||||
|
||||
let graph = flowgraph![
|
||||
bit_stream,
|
||||
to_freq,
|
||||
repeat,
|
||||
base_oscillator,
|
||||
local_oscillator,
|
||||
frontend,
|
||||
sink,
|
||||
];
|
||||
File::create("out.dot")
|
||||
.unwrap()
|
||||
.write_all(graph.get_dot().as_bytes())
|
||||
.unwrap();
|
||||
graph.run();
|
||||
let j = graph.run();
|
||||
let mut output = vec![];
|
||||
while let Ok(x) = rx.recv()
|
||||
{
|
||||
output.push(x);
|
||||
}
|
||||
let _ = j.join();
|
||||
|
||||
// Write signal
|
||||
let spec = hound::WavSpec {
|
||||
channels: 1,
|
||||
sample_rate,
|
||||
bits_per_sample: 16,
|
||||
sample_format: hound::SampleFormat::Int,
|
||||
};
|
||||
let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap();
|
||||
for x in output.iter()
|
||||
{
|
||||
let amplitude = i16::MAX as f32;
|
||||
writer.write_sample((x.re * amplitude) as i16).unwrap();
|
||||
}
|
||||
writer.finalize().unwrap();
|
||||
//
|
||||
|
||||
eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| {
|
||||
egui::CentralPanel::default().show(ctx, |ui| {
|
||||
egui_plot::Plot::new("hello").show(ui, |plot_ui| {
|
||||
plot_ui.line(Line::new(
|
||||
"samples",
|
||||
output
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, s)| [i as f64, s.re as f64])
|
||||
.collect::<PlotPoints>(),
|
||||
));
|
||||
});
|
||||
ctx.request_repaint();
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn to_bits(n: u8) -> [bool; 8]
|
||||
{
|
||||
[
|
||||
(n & 1) == 1,
|
||||
(n >> 1) & 1 == 1,
|
||||
(n >> 2) & 1 == 1,
|
||||
(n >> 3) & 1 == 1,
|
||||
(n >> 4) & 1 == 1,
|
||||
(n >> 5) & 1 == 1,
|
||||
(n >> 6) & 1 == 1,
|
||||
(n >> 7) & 1 == 1,
|
||||
]
|
||||
}
|
||||
|
||||
pub fn from_bits(n: [bool; 8]) -> u8
|
||||
{
|
||||
(n[0] as u8)
|
||||
| ((n[1] as u8) << 1)
|
||||
| ((n[2] as u8) << 2)
|
||||
| ((n[3] as u8) << 3)
|
||||
| ((n[4] as u8) << 4)
|
||||
| ((n[5] as u8) << 5)
|
||||
| ((n[6] as u8) << 6)
|
||||
| ((n[7] as u8) << 7)
|
||||
}
|
||||
|
||||
pub fn gaussian(sigma: f32, t: f32) -> f32
|
||||
{
|
||||
let sq = (t - 0.5) / sigma;
|
||||
(-sq * sq).exp()
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::ops::Add;
|
||||
use std::ops::{Add, Mul};
|
||||
|
||||
use oxydsp_flowgraph::{
|
||||
BlockIO,
|
||||
@ -59,3 +59,57 @@ where
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Multiplier<Ia, Ib, O>
|
||||
where
|
||||
Ia: Mul<Ib, Output = O> + 'static,
|
||||
Ib: 'static,
|
||||
O: 'static,
|
||||
{
|
||||
#[input]
|
||||
input_a: In<Ia>,
|
||||
|
||||
#[input]
|
||||
input_b: In<Ib>,
|
||||
|
||||
#[output]
|
||||
output: Out<O>,
|
||||
}
|
||||
|
||||
impl<Ia, Ib, O> Multiplier<Ia, Ib, O>
|
||||
where
|
||||
Ia: Mul<Ib, Output = O> + 'static,
|
||||
Ib: 'static,
|
||||
O: 'static,
|
||||
{
|
||||
pub fn new(input_a: In<Ia>, input_b: In<Ib>) -> (Self, In<O>)
|
||||
{
|
||||
let (output, added) = stream();
|
||||
(
|
||||
Self {
|
||||
input_a,
|
||||
input_b,
|
||||
output,
|
||||
},
|
||||
added,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Ia, Ib, O> Block for Multiplier<Ia, Ib, O>
|
||||
where
|
||||
Ia: Mul<Ib, Output = O> + 'static,
|
||||
Ib: 'static,
|
||||
O: 'static,
|
||||
{
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
self.output.push_iter(
|
||||
(&mut self.input_a, &mut self.input_b)
|
||||
.pop_iter()
|
||||
.map(|(a, b)| a * b),
|
||||
);
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,7 +50,15 @@ pub struct Nco<T: Float + From<f32> + 'static>
|
||||
|
||||
impl<T: Float + From<f32> + 'static> Nco<T>
|
||||
{
|
||||
pub fn new(
|
||||
pub fn new(input: In<DigitalFrequency>) -> (Self, In<Complex<T>>)
|
||||
{
|
||||
Self::with(
|
||||
input,
|
||||
crate::synthesis::oscillator::Nco::<T>::new(DigitalFrequency(0)),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn with(
|
||||
input: In<DigitalFrequency>,
|
||||
nco: crate::synthesis::oscillator::Nco<T>,
|
||||
) -> (Self, In<Complex<T>>)
|
||||
|
||||
@ -1,2 +1,3 @@
|
||||
pub mod adapters;
|
||||
pub mod channels;
|
||||
pub mod iter;
|
||||
|
||||
@ -1 +1,97 @@
|
||||
use oxydsp_flowgraph::{
|
||||
BlockIO,
|
||||
block::{Block, BlockResult},
|
||||
edge::{In, Out, PopIterable, stream},
|
||||
};
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Map<I: 'static, O: 'static, F>
|
||||
{
|
||||
#[input]
|
||||
input: In<I>,
|
||||
|
||||
#[output]
|
||||
output: Out<O>,
|
||||
|
||||
map: F,
|
||||
}
|
||||
|
||||
impl<I: 'static, O: 'static, F> Map<I, O, F>
|
||||
where
|
||||
F: Fn(I) -> O,
|
||||
{
|
||||
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
|
||||
{
|
||||
let (output, mapped) = stream();
|
||||
(Self { input, output, map }, mapped)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: 'static, O: 'static, F> Block for Map<I, O, F>
|
||||
where
|
||||
F: Fn(I) -> O,
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output.push_iter(self.input.pop_iter().map(&self.map));
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Repeat<T: 'static>
|
||||
{
|
||||
#[input]
|
||||
input: In<T>,
|
||||
|
||||
repetitions: usize,
|
||||
remaining: usize,
|
||||
current: Option<T>,
|
||||
|
||||
#[output]
|
||||
output: Out<T>,
|
||||
}
|
||||
|
||||
impl<T: 'static> Repeat<T>
|
||||
{
|
||||
pub fn new(input: In<T>, repetitions: usize) -> (Self, In<T>)
|
||||
{
|
||||
let (output, repeated) = stream();
|
||||
(
|
||||
Self {
|
||||
input,
|
||||
repetitions,
|
||||
remaining: repetitions,
|
||||
current: None,
|
||||
output,
|
||||
},
|
||||
repeated,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Clone + 'static> Block for Repeat<T>
|
||||
{
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let reader = self.input.read();
|
||||
let len = writer.len().min(reader.len() / self.repetitions + 1);
|
||||
|
||||
for _ in 0..len
|
||||
{
|
||||
if self.remaining == 0 || self.current.is_none()
|
||||
{
|
||||
self.current = Some(reader.pop().unwrap());
|
||||
self.remaining = self.repetitions;
|
||||
}
|
||||
|
||||
writer
|
||||
.push(self.current.clone().unwrap())
|
||||
.unwrap_or_else(|_| panic!());
|
||||
self.remaining -= 1;
|
||||
}
|
||||
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
95
oxydsp-dsp/src/blocks/utilities/channels.rs
Normal file
95
oxydsp-dsp/src/blocks/utilities/channels.rs
Normal file
@ -0,0 +1,95 @@
|
||||
use std::sync::mpsc::{Receiver, Sender, SyncSender};
|
||||
|
||||
use oxydsp_flowgraph::{
|
||||
BlockIO,
|
||||
block::{Block, BlockResult},
|
||||
edge::{In, Out, PopIterable, stream},
|
||||
};
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct RxSource<Rx, I: 'static>
|
||||
{
|
||||
input: Rx,
|
||||
|
||||
#[output]
|
||||
output: Out<I>,
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct TxSink<Tx, I: 'static>
|
||||
{
|
||||
#[input]
|
||||
input: In<I>,
|
||||
|
||||
output: Tx,
|
||||
}
|
||||
|
||||
impl<Rx, I: 'static> RxSource<Rx, I>
|
||||
{
|
||||
pub fn new(input: Rx) -> (Self, In<I>)
|
||||
{
|
||||
let (output, source) = stream();
|
||||
(Self { input, output }, source)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Tx, I: 'static> TxSink<Tx, I>
|
||||
{
|
||||
pub fn new(input: In<I>, output: Tx) -> Self
|
||||
{
|
||||
Self { input, output }
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: 'static> Block for RxSource<Receiver<I>, I>
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
if self.output.push_iter(self.input.iter())
|
||||
{
|
||||
BlockResult::Ok
|
||||
}
|
||||
else
|
||||
{
|
||||
BlockResult::Terminated
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: 'static> Block for TxSink<Sender<I>, I>
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
if self
|
||||
.input
|
||||
.pop_iter()
|
||||
.map(|x| self.output.send(x))
|
||||
.any(|res| res.is_err())
|
||||
{
|
||||
BlockResult::Terminated
|
||||
}
|
||||
else
|
||||
{
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: 'static> Block for TxSink<SyncSender<I>, I>
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
if self
|
||||
.input
|
||||
.pop_iter()
|
||||
.map(|x| self.output.send(x))
|
||||
.any(|res| res.is_err())
|
||||
{
|
||||
BlockResult::Terminated
|
||||
}
|
||||
else
|
||||
{
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -51,7 +51,8 @@ impl<T> Nco<T>
|
||||
|
||||
pub fn step(&mut self)
|
||||
{
|
||||
let _ = self.phase.overflowing_add(self.d_phase);
|
||||
let (new, _) = self.phase.overflowing_add(self.d_phase);
|
||||
self.phase = new;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::f64::consts::PI;
|
||||
use std::{f64::consts::PI, ops::Neg};
|
||||
|
||||
use crate::map;
|
||||
|
||||
@ -15,7 +15,7 @@ impl DigitalFrequency
|
||||
{
|
||||
// Frequnecy wraps arround : Going at 2 pi radians per second
|
||||
// Is like not oscillating at all
|
||||
let f = radians_per_sample.rem_euclid(radians_per_sample);
|
||||
let f = radians_per_sample.rem_euclid(2. * PI);
|
||||
|
||||
// Then we map the [0, 2*PI] range into the 0 to usize range
|
||||
DigitalFrequency(map(f, 0., 2. * PI, 0., usize::MAX as f64).floor() as usize)
|
||||
@ -36,3 +36,13 @@ impl DigitalFrequency
|
||||
map(self.0 as f64, 0., usize::MAX as f64, 0., sample_rate)
|
||||
}
|
||||
}
|
||||
|
||||
impl Neg for DigitalFrequency
|
||||
{
|
||||
type Output = Self;
|
||||
|
||||
fn neg(self) -> Self::Output
|
||||
{
|
||||
DigitalFrequency(usize::MAX - self.0)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,279 +0,0 @@
|
||||
use proc_macro::TokenStream;
|
||||
use zyn::ToTokens;
|
||||
use zyn::ext::AttrExt;
|
||||
use zyn::ext::FieldsExt;
|
||||
use zyn::syn::Attribute;
|
||||
use zyn::syn::Index;
|
||||
use zyn::syn::spanned::Spanned;
|
||||
|
||||
pub enum BlockIOPort
|
||||
{
|
||||
Field(TokenStream),
|
||||
Array(TokenStream, TokenStream),
|
||||
}
|
||||
|
||||
pub struct BlockIOPorts
|
||||
{
|
||||
inputs: Vec<BlockIOPort>,
|
||||
outputs: Vec<BlockIOPort>,
|
||||
}
|
||||
|
||||
pub fn parse_ports(fields: zyn::syn::Fields, attribute: &str) -> Vec<BlockIOPort>
|
||||
{
|
||||
let mut ports = vec![];
|
||||
let fields = fields.as_named().unwrap();
|
||||
|
||||
for field in fields.named.iter()
|
||||
{
|
||||
if field.attrs.iter().any(|attr| attr.is(attribute))
|
||||
{
|
||||
ports.append(&mut parse_port(field.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
ports.iter().for_each(|x| match x
|
||||
{
|
||||
BlockIOPort::Field(token_stream) => println!("field: {}", token_stream),
|
||||
BlockIOPort::Array(token_stream, token_stream1) =>
|
||||
{
|
||||
println!("array: {} [{}]", token_stream, token_stream1)
|
||||
}
|
||||
});
|
||||
|
||||
ports
|
||||
}
|
||||
|
||||
pub fn parse_port(field: zyn::syn::Field) -> Vec<BlockIOPort>
|
||||
{
|
||||
match field.ty
|
||||
{
|
||||
zyn::syn::Type::Path(type_path) =>
|
||||
{
|
||||
println!("{:?}", type_path.path);
|
||||
assert!(type_path.path.is_ident("In") || type_path.path.is_ident("Out"));
|
||||
vec![BlockIOPort::Field(
|
||||
field.ident.unwrap().to_token_stream().into(),
|
||||
)]
|
||||
}
|
||||
zyn::syn::Type::Tuple(type_tuple) =>
|
||||
{
|
||||
let mut output = vec![];
|
||||
for (i, _) in type_tuple.elems.iter().enumerate()
|
||||
{
|
||||
output.push(BlockIOPort::Field(
|
||||
zyn::zyn!({{field.ident.clone().unwrap()}}.{{ Index::from(i) }})
|
||||
.to_token_stream()
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
output
|
||||
}
|
||||
zyn::syn::Type::Array(type_array) =>
|
||||
{
|
||||
let b = BlockIOPort::Array(
|
||||
field.ident.clone().unwrap().to_token_stream().into(),
|
||||
type_array.len.to_token_stream().into(),
|
||||
);
|
||||
vec![b]
|
||||
}
|
||||
_ => panic!("Unsupported port type."),
|
||||
}
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
pub fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn set_index(&self, block_index: usize)
|
||||
{
|
||||
use oxydsp_flowgraph::edge::BlockIOIndex;
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
|
||||
{
|
||||
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
|
||||
}
|
||||
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
pub fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn get_successors(&self) -> Vec<oxydsp_flowgraph::edge::BlockIOIndex>
|
||||
{
|
||||
let mut output = vec![];
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block()
|
||||
{
|
||||
output.push(block_index);
|
||||
}
|
||||
}
|
||||
output
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
pub fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn get_block_name(&self) -> &'static str
|
||||
{
|
||||
return {{ ident.to_string() }};
|
||||
}
|
||||
|
||||
fn get_input_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
let mut output = Vec::new();
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
|
||||
{
|
||||
output.push({{ field.1.ident.clone().unwrap().to_string() }});
|
||||
}
|
||||
return output;
|
||||
}
|
||||
fn get_output_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
let mut output = Vec::new();
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
output.push({{ field.1.ident.clone().unwrap().to_string() }});
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
fn get_output_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
let mut output = Vec::new();
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
output.push(self.{{ field.1.ident.clone() }}.get_type_name());
|
||||
}
|
||||
return output;
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
pub fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
let input_count = fields
|
||||
.iter()
|
||||
.filter(|x| x.attrs.iter().any(|x| x.is("input")))
|
||||
.count();
|
||||
let output_count = fields
|
||||
.iter()
|
||||
.filter(|x| x.attrs.iter().any(|x| x.is("output")))
|
||||
.count();
|
||||
zyn::zyn!(
|
||||
fn input_count(&self) -> usize
|
||||
{
|
||||
return { { input_count } };
|
||||
}
|
||||
|
||||
fn output_count(&self) -> usize
|
||||
{
|
||||
return { { output_count } };
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element(debug = "pretty")]
|
||||
pub fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
#[allow(unreachable_code)]
|
||||
fn set_anonymous_out_stream(
|
||||
&mut self,
|
||||
output_index: usize,
|
||||
producer: oxydsp_flowgraph::edge::AnonymousStreamProducer,
|
||||
)
|
||||
{
|
||||
match output_index
|
||||
{
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(producer),
|
||||
}
|
||||
_ => panic!("output_index out of bounds.")
|
||||
};
|
||||
}
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer)
|
||||
{
|
||||
match input_index
|
||||
{
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
|
||||
{
|
||||
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(consumer),
|
||||
}
|
||||
_ => panic!("output_index out of bounds.")
|
||||
};
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
pub fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
#[allow(unreachable_code)]
|
||||
fn create_anonymous_stream_for(
|
||||
&mut self,
|
||||
output_index: usize,
|
||||
capacity: usize
|
||||
) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
|
||||
{
|
||||
let (tx, rx): (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
|
||||
= match output_index
|
||||
{
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
{{ field.0 }} =>
|
||||
{
|
||||
let (tx, rx) = oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity);
|
||||
(tx.into(), rx.into())
|
||||
},
|
||||
}
|
||||
_ => panic!("output_index out of bounds.")
|
||||
};
|
||||
|
||||
(tx, rx)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
pub fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream
|
||||
{
|
||||
let out_ty = match ty
|
||||
{
|
||||
zyn::syn::Type::Path(type_path) => match &type_path.path.segments.last().unwrap().arguments
|
||||
{
|
||||
zyn::syn::PathArguments::AngleBracketed(args) => match args.args.first().unwrap()
|
||||
{
|
||||
zyn::syn::GenericArgument::Type(x) => Some(x.clone().to_token_stream()),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if out_ty.is_none()
|
||||
{
|
||||
bail!("Output type must be a Out<T> type."; span = ty.span());
|
||||
}
|
||||
|
||||
out_ty.unwrap()
|
||||
}
|
||||
@ -8,9 +8,6 @@ use zyn::syn::Index;
|
||||
use zyn::syn::Lit;
|
||||
use zyn::syn::spanned::Spanned;
|
||||
|
||||
mod block_io;
|
||||
use crate::block_io::*;
|
||||
|
||||
#[zyn::derive("BlockIO", attributes(input, output), debug = "pretty")]
|
||||
pub fn block_io(
|
||||
#[zyn(input)] ident: zyn::Extract<zyn::syn::Ident>,
|
||||
@ -18,7 +15,6 @@ pub fn block_io(
|
||||
#[zyn(input)] fields: zyn::Fields,
|
||||
) -> zyn::TokenStream
|
||||
{
|
||||
//parse_ports(fields.clone(), "input");
|
||||
let ident = ident.inner();
|
||||
let (impl_generics, type_generics, where_clause) = generics.split_for_impl();
|
||||
zyn::zyn!(
|
||||
@ -35,6 +31,204 @@ pub fn block_io(
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn set_index(&self, block_index: usize)
|
||||
{
|
||||
use oxydsp_flowgraph::edge::BlockIOIndex;
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
|
||||
{
|
||||
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
|
||||
}
|
||||
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn get_successors(&self) -> Vec<oxydsp_flowgraph::edge::BlockIOIndex>
|
||||
{
|
||||
let mut output = vec![];
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block()
|
||||
{
|
||||
output.push(block_index);
|
||||
}
|
||||
}
|
||||
output
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn get_block_name(&self) -> &'static str
|
||||
{
|
||||
return {{ ident.to_string() }};
|
||||
}
|
||||
|
||||
fn get_input_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
let mut output = Vec::new();
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
|
||||
{
|
||||
output.push({{ field.1.ident.clone().unwrap().to_string() }});
|
||||
}
|
||||
return output;
|
||||
}
|
||||
fn get_output_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
let mut output = Vec::new();
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
output.push({{ field.1.ident.clone().unwrap().to_string() }});
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
fn get_output_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
let mut output = Vec::new();
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
output.push(self.{{ field.1.ident.clone() }}.get_type_name());
|
||||
}
|
||||
return output;
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
let input_count = fields
|
||||
.iter()
|
||||
.filter(|x| x.attrs.iter().any(|x| x.is("input")))
|
||||
.count();
|
||||
let output_count = fields
|
||||
.iter()
|
||||
.filter(|x| x.attrs.iter().any(|x| x.is("output")))
|
||||
.count();
|
||||
zyn::zyn!(
|
||||
fn input_count(&self) -> usize
|
||||
{
|
||||
return { { input_count } };
|
||||
}
|
||||
|
||||
fn output_count(&self) -> usize
|
||||
{
|
||||
return { { output_count } };
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element(debug = "pretty")]
|
||||
fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
#[allow(unreachable_code)]
|
||||
fn set_anonymous_out_stream(
|
||||
&mut self,
|
||||
output_index: usize,
|
||||
producer: oxydsp_flowgraph::edge::AnonymousStreamProducer,
|
||||
)
|
||||
{
|
||||
match output_index
|
||||
{
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(producer),
|
||||
}
|
||||
_ => panic!("output_index out of bounds.")
|
||||
};
|
||||
}
|
||||
|
||||
#[allow(unreachable_code)]
|
||||
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer)
|
||||
{
|
||||
match input_index
|
||||
{
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
|
||||
{
|
||||
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(consumer),
|
||||
}
|
||||
_ => panic!("output_index out of bounds.")
|
||||
};
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
#[allow(unreachable_code)]
|
||||
fn create_anonymous_stream_for(
|
||||
&mut self,
|
||||
output_index: usize,
|
||||
capacity: usize
|
||||
) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
|
||||
{
|
||||
let (tx, rx): (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
|
||||
= match output_index
|
||||
{
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
{{ field.0 }} =>
|
||||
{
|
||||
let (tx, rx) = oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity);
|
||||
(tx.into(), rx.into())
|
||||
},
|
||||
}
|
||||
_ => panic!("output_index out of bounds.")
|
||||
};
|
||||
|
||||
(tx, rx)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream
|
||||
{
|
||||
let out_ty = match ty
|
||||
{
|
||||
zyn::syn::Type::Path(type_path) => match &type_path.path.segments.last().unwrap().arguments
|
||||
{
|
||||
zyn::syn::PathArguments::AngleBracketed(args) => match args.args.first().unwrap()
|
||||
{
|
||||
zyn::syn::GenericArgument::Type(x) => Some(x.clone().to_token_stream()),
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if out_ty.is_none()
|
||||
{
|
||||
bail!("Output type must be a Out<T> type."; span = ty.span());
|
||||
}
|
||||
|
||||
out_ty.unwrap()
|
||||
}
|
||||
|
||||
// Sync block
|
||||
|
||||
#[zyn::attribute]
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use crate::block::GraphableBlock;
|
||||
|
||||
#[macro_export]
|
||||
@ -33,22 +35,35 @@ impl FlowGraph
|
||||
self.blocks.push(Box::new(block));
|
||||
}
|
||||
|
||||
pub fn run(mut self)
|
||||
pub fn run(mut self) -> JoinHandle<()>
|
||||
{
|
||||
self.populate_edges();
|
||||
let mut k = vec![];
|
||||
for mut x in self.blocks.into_iter()
|
||||
{
|
||||
k.push(std::thread::spawn(move || {
|
||||
loop
|
||||
|
||||
std::thread::spawn(move || {
|
||||
'outer: loop
|
||||
{
|
||||
for x in self.blocks.iter_mut()
|
||||
{
|
||||
match x.work()
|
||||
{
|
||||
crate::block::BlockResult::Ok =>
|
||||
{}
|
||||
crate::block::BlockResult::Terminated =>
|
||||
{
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _ in 0..10_000
|
||||
{
|
||||
for x in self.blocks.iter_mut()
|
||||
{
|
||||
x.work();
|
||||
}
|
||||
}));
|
||||
}
|
||||
k.into_iter().for_each(|j| {
|
||||
let _ = j.join();
|
||||
});
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn populate_edges(&mut self)
|
||||
|
||||
@ -48,10 +48,10 @@ pub struct StreamConsumer<T>
|
||||
}
|
||||
|
||||
unsafe impl<T: Send> Send for StreamProducer<T> {}
|
||||
unsafe impl<T> Sync for StreamProducer<T> {}
|
||||
unsafe impl<T: Send> Sync for StreamProducer<T> {}
|
||||
|
||||
unsafe impl<T: Send> Send for StreamConsumer<T> {}
|
||||
unsafe impl<T> Sync for StreamConsumer<T> {}
|
||||
unsafe impl<T: Send> Sync for StreamConsumer<T> {}
|
||||
|
||||
// Represents a write operation within a stream producer
|
||||
pub struct StreamWriter<'a, T>
|
||||
|
||||
Reference in New Issue
Block a user