Starting fsk demod
This commit is contained in:
BIN
example/mod.wav
BIN
example/mod.wav
Binary file not shown.
@ -8,14 +8,16 @@ Repeat_2 [label="{ {<i0> input}| Repeat |{<o0> output} }"];
|
||||
Nco_3 [label="{ {<i0> frequency}| Nco |{<o0> output} }"];
|
||||
OscillatorSource_4 [label="{ OscillatorSource |{<o0> output} }"];
|
||||
Multiplier_5 [label="{ {<i0> input_a|<i1> input_b}| Multiplier |{<o0> output} }"];
|
||||
TxSink_6 [label="{ {<i0> input}| TxSink }"];
|
||||
MapResultTagged_6 [label="{ {<i0> input}| MapResultTagged |{<o0> output} }"];
|
||||
NullSink_7 [label="{ {<i0> input}| NullSink }"];
|
||||
|
||||
IterSource_0:o0 -> Map_1:i0 [label="bool"];
|
||||
Map_1:o0 -> Repeat_2:i0 [label="oxydsp_dsp::units::DigitalFrequency"];
|
||||
Repeat_2:o0 -> Nco_3:i0 [label="oxydsp_dsp::units::DigitalFrequency"];
|
||||
Nco_3:o0 -> Multiplier_5:i0 [label="num_complex::Complex<f32>"];
|
||||
OscillatorSource_4:o0 -> Multiplier_5:i1 [label="num_complex::Complex<f32>"];
|
||||
Multiplier_5:o0 -> TxSink_6:i0 [label="num_complex::Complex<f32>"];
|
||||
Multiplier_5:o0 -> MapResultTagged_6:i0 [label="num_complex::Complex<f32>"];
|
||||
MapResultTagged_6:o0 -> NullSink_7:i0 [label="num_complex::Complex<f32>"];
|
||||
|
||||
}
|
||||
|
||||
@ -4,17 +4,30 @@ use std::io::Write;
|
||||
use std::sync::mpsc;
|
||||
|
||||
use eframe::NativeOptions;
|
||||
use egui::Color32;
|
||||
use egui_plot::Line;
|
||||
use egui_plot::MarkerShape;
|
||||
use egui_plot::PlotPoints;
|
||||
use egui_plot::Points;
|
||||
use egui_plot::Polygon;
|
||||
use egui_plot::VLine;
|
||||
use num::Complex;
|
||||
use num::Zero;
|
||||
use oxydsp_dsp::blocks::filtering::fir::FirFilter;
|
||||
use oxydsp_dsp::blocks::math::basic::Adder;
|
||||
use oxydsp_dsp::blocks::math::basic::Multiplier;
|
||||
use oxydsp_dsp::blocks::synthesis::Nco;
|
||||
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
|
||||
use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Map;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::MapResult;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Repeat;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Scan;
|
||||
use oxydsp_dsp::blocks::utilities::channels::TxSink;
|
||||
use oxydsp_dsp::blocks::utilities::iter::IterSource;
|
||||
use oxydsp_dsp::filtering::fir::Fir;
|
||||
use oxydsp_dsp::units::DigitalFrequency;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
@ -170,6 +183,11 @@ impl<T: 'static + Display> Printer<T>
|
||||
}
|
||||
}
|
||||
|
||||
fn main()
|
||||
{
|
||||
main_demod()
|
||||
}
|
||||
|
||||
fn main_tst()
|
||||
{
|
||||
let (sourcea, a) = SourceTag::new("valuea".to_string());
|
||||
@ -181,22 +199,140 @@ fn main_tst()
|
||||
let _ = fg.run().join();
|
||||
}
|
||||
|
||||
fn main()
|
||||
{
|
||||
let sample_rate = 48_000;
|
||||
let sample_per_symbol = 96;
|
||||
let deviation = DigitalFrequency::from_time_frequency(500., sample_rate as f64);
|
||||
let carrier = DigitalFrequency::from_time_frequency(1000., sample_rate as f64);
|
||||
const SAMPLE_RATE: usize = 48_000;
|
||||
const SAMPLE_PER_SYMBOL: usize = 96;
|
||||
const DEVIATION: f64 = 500.;
|
||||
const CARRIER: f64 = 1000.;
|
||||
|
||||
fn main_demod()
|
||||
{
|
||||
let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64);
|
||||
|
||||
let mut reader = hound::WavReader::open("mod.wav").unwrap();
|
||||
let sqr_sum = reader
|
||||
.samples::<i16>()
|
||||
.map(|sample| (sample.unwrap() as f32) / (i16::MAX as f32))
|
||||
.collect::<Vec<_>>();
|
||||
let (iter_source, signal) = IterSource::new(sqr_sum.into_iter());
|
||||
|
||||
// Make an iq sampler
|
||||
let (lo, lo_signal) = OscillatorSource::new(carrier.into());
|
||||
let (mixer, iq) = Multiplier::new(signal, lo_signal);
|
||||
let (iq_bandpass, iq) = FirFilter::<Complex<f32>, Complex<f32>, Complex<f32>>::new(
|
||||
iq,
|
||||
Fir::lowpass(carrier, 100).normalized(),
|
||||
);
|
||||
let (arg_extract, arg) = Scan::new(iq, Complex::zero(), |state, sample| {
|
||||
let angle = *state / sample;
|
||||
*state = sample;
|
||||
angle.arg()
|
||||
});
|
||||
let (sig_lowpass, arg) =
|
||||
FirFilter::<f32, f32, f32>::new(arg, Fir(vec![1.; SAMPLE_PER_SYMBOL]).normalized());
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let elg_loop = Fir(vec![1.0f32; 10]);
|
||||
let mut elg_loop = elg_loop.normalized();
|
||||
elg_loop.0[0] = 0.5;
|
||||
let (elg, arg) = EarlyLateGate::new(arg, elg_loop, SAMPLE_PER_SYMBOL);
|
||||
let (sender, arg) = MapResultTagged::new(arg, move |x| {
|
||||
let _ = tx.send((
|
||||
x.0,
|
||||
x.1.as_ref()
|
||||
.is_some_and(|t| t.retrieve("elg_symbol").is_some()),
|
||||
));
|
||||
if x.1
|
||||
.is_some_and(|t| t.retrieve("itersource_finished").is_some())
|
||||
{
|
||||
println!("FINISHED !");
|
||||
(x.0.into(), BlockResult::Exit)
|
||||
}
|
||||
else
|
||||
{
|
||||
(x.0.into(), BlockResult::Ok)
|
||||
}
|
||||
});
|
||||
let null_sink = NullSink::new(arg);
|
||||
|
||||
let graph = flowgraph![
|
||||
iter_source,
|
||||
lo,
|
||||
mixer,
|
||||
iq_bandpass,
|
||||
arg_extract,
|
||||
sig_lowpass,
|
||||
elg,
|
||||
sender,
|
||||
null_sink
|
||||
];
|
||||
let j = graph.run();
|
||||
|
||||
let mut output = vec![];
|
||||
while let Ok(x) = rx.recv()
|
||||
{
|
||||
output.push(x);
|
||||
}
|
||||
let _ = j.join();
|
||||
|
||||
eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| {
|
||||
egui::CentralPanel::default().show(ctx, |ui| {
|
||||
egui_plot::Plot::new("hello").show(ui, |plot_ui| {
|
||||
plot_ui.line(Line::new(
|
||||
"samples",
|
||||
output
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, s)| [i as f64, s.0 as f64])
|
||||
.collect::<PlotPoints>(),
|
||||
));
|
||||
|
||||
plot_ui.points(
|
||||
Points::new(
|
||||
"symbols",
|
||||
output
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_, (_, x))| *x)
|
||||
.map(|(i, (s, _))| [i as f64, *s as f64])
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.id("symbols")
|
||||
.radius(5.)
|
||||
.shape(MarkerShape::Diamond),
|
||||
);
|
||||
});
|
||||
ctx.request_repaint();
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn main_mod()
|
||||
{
|
||||
let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64);
|
||||
let deviation = DigitalFrequency::from_time_frequency(DEVIATION, SAMPLE_RATE as f64);
|
||||
let data = (0..255u8).flat_map(to_bits).collect::<Vec<_>>();
|
||||
let (bit_stream, bits) = IterSource::new(data.into_iter());
|
||||
let (to_freq, freq) = Map::new(bits, move |x| [-deviation, deviation][x as usize]);
|
||||
let (repeat, freq) = Repeat::new(freq, sample_per_symbol);
|
||||
let (repeat, freq) = Repeat::new(freq, SAMPLE_PER_SYMBOL);
|
||||
let (base_oscillator, baseband) = Nco::<f32>::new(freq);
|
||||
let (local_oscillator, lo) = OscillatorSource::<f32>::new(carrier.into());
|
||||
let (frontend, passband) = Multiplier::new(baseband, lo);
|
||||
let (tx, rx) = mpsc::channel::<Complex<f32>>();
|
||||
let sink = TxSink::new(passband, tx);
|
||||
let (sender, passband) = MapResultTagged::new(passband, move |x| {
|
||||
let _ = tx.send(x.0);
|
||||
if x.1
|
||||
.is_some_and(|t| t.retrieve("itersource_finished").is_some())
|
||||
{
|
||||
println!("FINISHED !");
|
||||
(x.0.into(), BlockResult::Exit)
|
||||
}
|
||||
else
|
||||
{
|
||||
(x.0.into(), BlockResult::Ok)
|
||||
}
|
||||
});
|
||||
let null_sink = NullSink::new(passband);
|
||||
|
||||
let graph = flowgraph![
|
||||
bit_stream,
|
||||
@ -205,7 +341,8 @@ fn main()
|
||||
base_oscillator,
|
||||
local_oscillator,
|
||||
frontend,
|
||||
sink,
|
||||
sender,
|
||||
null_sink,
|
||||
];
|
||||
File::create("out.dot")
|
||||
.unwrap()
|
||||
@ -222,7 +359,7 @@ fn main()
|
||||
// Write signal
|
||||
let spec = hound::WavSpec {
|
||||
channels: 1,
|
||||
sample_rate,
|
||||
sample_rate: SAMPLE_RATE as u32,
|
||||
bits_per_sample: 16,
|
||||
sample_format: hound::SampleFormat::Int,
|
||||
};
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
pub mod filtering;
|
||||
pub mod math;
|
||||
pub mod synthesis;
|
||||
pub mod ted;
|
||||
pub mod utilities;
|
||||
|
||||
1
oxydsp-dsp/src/blocks/filtering.rs
Normal file
1
oxydsp-dsp/src/blocks/filtering.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod fir;
|
||||
58
oxydsp-dsp/src/blocks/filtering/fir.rs
Normal file
58
oxydsp-dsp/src/blocks/filtering/fir.rs
Normal file
@ -0,0 +1,58 @@
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use std::iter::Sum;
|
||||
use std::ops::Mul;
|
||||
|
||||
use crate::filtering::fir::Fir;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block]
|
||||
pub struct FirFilter<F, T, O>
|
||||
where
|
||||
T: Clone + 'static,
|
||||
F: Mul<T, Output = O> + Clone + 'static,
|
||||
O: Sum + 'static,
|
||||
{
|
||||
#[input]
|
||||
input: In<T>,
|
||||
|
||||
#[output]
|
||||
output: Out<O>,
|
||||
|
||||
filter: crate::filtering::fir::FirFilter<F, T, O>,
|
||||
}
|
||||
|
||||
impl<F, T, O> FirFilter<F, T, O>
|
||||
where
|
||||
T: Clone + 'static,
|
||||
F: Mul<T, Output = O> + Clone + 'static,
|
||||
O: Sum + 'static,
|
||||
{
|
||||
pub fn new(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
|
||||
{
|
||||
let (output, filtered) = oxydsp_flowgraph::io::stream();
|
||||
(
|
||||
Self {
|
||||
input,
|
||||
output,
|
||||
filter: crate::filtering::fir::FirFilter::new(impulse_response),
|
||||
},
|
||||
filtered,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, F, T, O> SyncBlock<'view> for FirFilter<F, T, O>
|
||||
where
|
||||
T: Clone + 'view,
|
||||
F: Mul<T, Output = O> + Clone + 'static,
|
||||
O: Sum + 'static,
|
||||
{
|
||||
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
|
||||
{
|
||||
Some(state.filter.next(input))
|
||||
}
|
||||
}
|
||||
@ -12,7 +12,7 @@ use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::TagMergable;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block(tagged)]
|
||||
#[sync_block]
|
||||
pub struct Adder<Ia, Ib, O>
|
||||
where
|
||||
Ia: Add<Ib, Output = O> + 'static,
|
||||
|
||||
@ -31,7 +31,8 @@ impl<T: Float + From<f32> + 'static> Block for OscillatorSource<T>
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output.push_iter((&mut self.nco).map(|x| (x, None)));
|
||||
self.output
|
||||
.push_iter((&mut self.nco).map(|x| (x, None).into()));
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
@ -82,7 +83,7 @@ impl<T: Float + From<f32> + 'static> Block for Nco<T>
|
||||
self.output
|
||||
.push_iter(&mut self.frequency.pop_iter().map(|f| {
|
||||
self.nco.set_frequency(f.0);
|
||||
(self.nco.next().unwrap(), f.1)
|
||||
(self.nco.next().unwrap(), f.1).into()
|
||||
}));
|
||||
BlockResult::Ok
|
||||
}
|
||||
|
||||
1
oxydsp-dsp/src/blocks/ted.rs
Normal file
1
oxydsp-dsp/src/blocks/ted.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod early_late;
|
||||
111
oxydsp-dsp/src/blocks/ted/early_late.rs
Normal file
111
oxydsp-dsp/src/blocks/ted/early_late.rs
Normal file
@ -0,0 +1,111 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::iter::Sum;
|
||||
|
||||
use num::Float;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::Tag;
|
||||
|
||||
use crate::filtering::fir::Fir;
|
||||
use crate::filtering::fir::FirFilter;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block(tagged)]
|
||||
pub struct EarlyLateGate<T: Float + Sum + Clone + 'static>
|
||||
{
|
||||
#[input]
|
||||
input: In<T>,
|
||||
|
||||
#[output]
|
||||
output: Out<T>,
|
||||
|
||||
symbol_length: usize,
|
||||
|
||||
// Window looking at symbol_length samples at a time
|
||||
window: VecDeque<T>,
|
||||
|
||||
// The current location of the window, in relation to the last sample
|
||||
window_location: usize,
|
||||
|
||||
window_center: usize,
|
||||
|
||||
// The next window location, in relation to the last sample such that the window is centered on
|
||||
// a symbol center (hopefully)
|
||||
next_sample: usize,
|
||||
loop_filter: FirFilter<T, T, T>,
|
||||
}
|
||||
|
||||
impl<T> EarlyLateGate<T>
|
||||
where
|
||||
T: Float + Sum + Clone + 'static,
|
||||
{
|
||||
pub fn new(input: In<T>, loop_filter: Fir<T>, symbol_length: usize) -> (Self, In<T>)
|
||||
{
|
||||
let (output, samples) = oxydsp_flowgraph::io::stream();
|
||||
(
|
||||
Self {
|
||||
input,
|
||||
output,
|
||||
window: VecDeque::with_capacity(symbol_length),
|
||||
symbol_length,
|
||||
window_location: 0,
|
||||
window_center: symbol_length / 2,
|
||||
next_sample: symbol_length, // We assume that the first symbol is 1.5 windows into
|
||||
// the stream
|
||||
loop_filter: FirFilter::new(loop_filter),
|
||||
},
|
||||
samples,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, T> SyncBlock<'view> for EarlyLateGate<T>
|
||||
where
|
||||
T: Float + Sum + Clone + 'static,
|
||||
{
|
||||
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
|
||||
{
|
||||
if state.window.len() < *state.symbol_length
|
||||
{
|
||||
state.window.push_back(input.0);
|
||||
return Some(input.0.into());
|
||||
}
|
||||
|
||||
// Bring new sample in
|
||||
state.window.pop_front();
|
||||
state.window.push_back(input.0);
|
||||
*state.window_location += 1;
|
||||
|
||||
let sample = state.window[*state.window_center];
|
||||
let mut tag = None;
|
||||
if *state.window_location >= *state.next_sample
|
||||
{
|
||||
let new_tag = Tag::default();
|
||||
new_tag.tag("elg_symbol", ());
|
||||
tag = Some(new_tag);
|
||||
|
||||
let early_index = *state.window_center - (0.25 * *state.symbol_length as f32) as usize;
|
||||
let late_index = *state.window_center + (0.25 * *state.symbol_length as f32) as usize;
|
||||
|
||||
let early_sample = state.window[early_index];
|
||||
let late_sample = state.window[late_index];
|
||||
|
||||
let error = (late_sample - early_sample) * sample;
|
||||
let correction = state.loop_filter.next(error);
|
||||
|
||||
// Figure out next sample location
|
||||
*state.next_sample += (*state.symbol_length as isize
|
||||
+ correction.floor().to_isize().unwrap_or(0))
|
||||
.max(0) as usize;
|
||||
|
||||
// Turn everything back relative to current sample
|
||||
*state.next_sample -= *state.window_location;
|
||||
*state.window_location = 0;
|
||||
}
|
||||
|
||||
Some((sample, tag).into())
|
||||
}
|
||||
}
|
||||
@ -1,11 +1,17 @@
|
||||
use core::sync;
|
||||
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
use oxydsp_flowgraph::io::stream;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::Tag;
|
||||
use oxydsp_flowgraph::tag::TagMergable;
|
||||
use oxydsp_flowgraph::tag::Tagged;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Map<I: 'static, O: 'static, F>
|
||||
@ -36,12 +42,168 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output
|
||||
.push_iter(self.input.pop_iter().map(|x| ((&self.map)(x.0), x.1)));
|
||||
self.output.push_iter(
|
||||
self.input
|
||||
.pop_iter()
|
||||
.map(|x| ((&self.map)(x.0), x.1).into()),
|
||||
);
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct MapResult<I: 'static, O: 'static, F>
|
||||
{
|
||||
#[input]
|
||||
input: In<I>,
|
||||
|
||||
#[output]
|
||||
output: Out<O>,
|
||||
|
||||
map: F,
|
||||
}
|
||||
|
||||
impl<I: 'static, O: 'static, F> MapResult<I, O, F>
|
||||
where
|
||||
F: Fn(I) -> (O, BlockResult),
|
||||
{
|
||||
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
|
||||
{
|
||||
let (output, mapped) = stream();
|
||||
(Self { input, output, map }, mapped)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: 'static, O: 'static, F> Block for MapResult<I, O, F>
|
||||
where
|
||||
F: Fn(I) -> (O, BlockResult),
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let reader = self.input.read();
|
||||
|
||||
for _ in 0..(writer.len().min(reader.len()))
|
||||
{
|
||||
let (input, tag_opt) = reader.pop().unwrap().into();
|
||||
let (output, result) = (self.map)(input);
|
||||
let _ = writer.push((output, tag_opt).into());
|
||||
match result
|
||||
{
|
||||
BlockResult::Terminated | BlockResult::Exit =>
|
||||
{
|
||||
return result;
|
||||
}
|
||||
BlockResult::Ok =>
|
||||
{}
|
||||
}
|
||||
}
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct MapResultTagged<I: 'static, O: 'static, F>
|
||||
{
|
||||
#[input]
|
||||
input: In<I>,
|
||||
|
||||
#[output]
|
||||
output: Out<O>,
|
||||
|
||||
map: F,
|
||||
}
|
||||
|
||||
impl<I: 'static, O: 'static, F> MapResultTagged<I, O, F>
|
||||
where
|
||||
F: Fn(Tagged<I>) -> (Tagged<O>, BlockResult),
|
||||
{
|
||||
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
|
||||
{
|
||||
let (output, mapped) = stream();
|
||||
(Self { input, output, map }, mapped)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: 'static, O: 'static, F> Block for MapResultTagged<I, O, F>
|
||||
where
|
||||
F: Fn(Tagged<I>) -> (Tagged<O>, BlockResult),
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let reader = self.input.read();
|
||||
|
||||
for _ in 0..(writer.len().min(reader.len()))
|
||||
{
|
||||
let (input, tag_opt) = reader.pop().unwrap().into();
|
||||
let (tagged_out, result) = (self.map)((input, tag_opt.clone()).into());
|
||||
let (output, tag_out) = tagged_out.into();
|
||||
|
||||
let _ = writer.push((output, tag_opt.merge(&tag_out)).into());
|
||||
match result
|
||||
{
|
||||
BlockResult::Terminated | BlockResult::Exit =>
|
||||
{
|
||||
return result;
|
||||
}
|
||||
BlockResult::Ok =>
|
||||
{}
|
||||
}
|
||||
}
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block]
|
||||
pub struct Scan<I: 'static, O: 'static, S, F>
|
||||
where
|
||||
F: Fn(&mut S, I) -> O,
|
||||
{
|
||||
#[input]
|
||||
input: In<I>,
|
||||
|
||||
#[output]
|
||||
output: Out<O>,
|
||||
|
||||
state: S,
|
||||
|
||||
map: F,
|
||||
}
|
||||
|
||||
impl<I: 'static, O: 'static, S, F> Scan<I, O, S, F>
|
||||
where
|
||||
F: Fn(&mut S, I) -> O,
|
||||
{
|
||||
pub fn new(input: In<I>, initial_state: S, map: F) -> (Self, In<O>)
|
||||
{
|
||||
let (output, mapped) = stream();
|
||||
(
|
||||
Self {
|
||||
input,
|
||||
output,
|
||||
state: initial_state,
|
||||
map,
|
||||
},
|
||||
mapped,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, I, O, S, F> SyncBlock<'view> for Scan<I, O, S, F>
|
||||
where
|
||||
I: 'static,
|
||||
O: 'static,
|
||||
S: 'view,
|
||||
F: Fn(&mut S, I) -> O + 'view,
|
||||
{
|
||||
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
|
||||
{
|
||||
Some((*state.map)(state.state, input))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Repeat<T: 'static>
|
||||
{
|
||||
@ -110,3 +272,28 @@ impl<T: Clone + 'static> Block for Repeat<T>
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block]
|
||||
pub struct NullSink<T: 'static>
|
||||
{
|
||||
#[input]
|
||||
input: In<T>,
|
||||
}
|
||||
|
||||
impl<T: 'static> NullSink<T>
|
||||
{
|
||||
pub fn new(input: In<T>) -> Self
|
||||
{
|
||||
Self { input }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, I: 'static> SyncBlock<'view> for NullSink<I>
|
||||
{
|
||||
fn sync_work(_: Self::StateView, _: Self::Input) -> Option<Self::Output>
|
||||
{
|
||||
// Don't do shit !
|
||||
Some(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,7 +49,9 @@ impl<I: 'static> Block for RxSource<Receiver<I>, I>
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
if self.output.push_iter(self.input.iter().map(|x| (x, None)))
|
||||
if self
|
||||
.output
|
||||
.push_iter(self.input.iter().map(|x| (x, None).into()))
|
||||
{
|
||||
BlockResult::Ok
|
||||
}
|
||||
|
||||
@ -1,15 +1,21 @@
|
||||
use oxydsp_flowgraph::{
|
||||
BlockIO,
|
||||
block::{Block, BlockResult},
|
||||
io::{In, Out, stream},
|
||||
};
|
||||
use std::iter::Peekable;
|
||||
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::stream;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::Tag;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct IterSource<I: Iterator>
|
||||
where
|
||||
I::Item: 'static,
|
||||
{
|
||||
iter: I,
|
||||
iter: Peekable<I>,
|
||||
|
||||
#[output]
|
||||
output: Out<I::Item>,
|
||||
@ -23,7 +29,13 @@ where
|
||||
pub fn new(iter: I) -> (Self, In<I::Item>)
|
||||
{
|
||||
let (output, items) = stream();
|
||||
(Self { iter, output }, items)
|
||||
(
|
||||
Self {
|
||||
iter: iter.peekable(),
|
||||
output,
|
||||
},
|
||||
items,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,13 +46,22 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
if self.output.push_iter((&mut self.iter).map(|x| (x, None)))
|
||||
let writer = self.output.write();
|
||||
|
||||
for _ in 0..writer.len()
|
||||
{
|
||||
BlockResult::Ok
|
||||
}
|
||||
else
|
||||
{
|
||||
BlockResult::Terminated
|
||||
if let Some(element) = self.iter.next()
|
||||
{
|
||||
let mut tag = None;
|
||||
if self.iter.peek().is_none()
|
||||
{
|
||||
let new_tag = Tag::default();
|
||||
new_tag.tag("itersource_finished", ());
|
||||
tag = Some(new_tag);
|
||||
}
|
||||
let _ = writer.push((element, tag).into());
|
||||
}
|
||||
}
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
1
oxydsp-dsp/src/filtering.rs
Normal file
1
oxydsp-dsp/src/filtering.rs
Normal file
@ -0,0 +1 @@
|
||||
pub mod fir;
|
||||
120
oxydsp-dsp/src/filtering/fir.rs
Normal file
120
oxydsp-dsp/src/filtering/fir.rs
Normal file
@ -0,0 +1,120 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::f64::consts::PI;
|
||||
use std::iter::Sum;
|
||||
use std::ops::Div;
|
||||
use std::ops::Mul;
|
||||
use std::process::Output;
|
||||
|
||||
use num::Complex;
|
||||
use num::Float;
|
||||
use num::One;
|
||||
use num::Zero;
|
||||
use num::complex::ComplexFloat;
|
||||
use num::zero;
|
||||
use rustfft::FftNum;
|
||||
use rustfft::FftPlanner;
|
||||
|
||||
use crate::map;
|
||||
use crate::units::DigitalFrequency;
|
||||
|
||||
/// Finite impulse response
|
||||
pub struct Fir<T>(pub Vec<T>);
|
||||
|
||||
impl<T> Fir<Complex<T>>
|
||||
where
|
||||
T: FftNum + Float + Clone,
|
||||
{
|
||||
pub fn from_transfer_function(tf: impl AsRef<[Complex<T>]>) -> Fir<Complex<T>>
|
||||
{
|
||||
let mut planner = FftPlanner::new();
|
||||
let tf_len = tf.as_ref().len();
|
||||
let ifft = planner.plan_fft_inverse(tf.as_ref().len());
|
||||
|
||||
let mut fir = tf.as_ref().to_vec();
|
||||
ifft.process(fir.as_mut_slice());
|
||||
|
||||
let mut shifted_fir = vec![];
|
||||
for i in 0..tf_len
|
||||
{
|
||||
let k = (tf_len - (tf_len / 2) + i) % tf_len;
|
||||
shifted_fir.push(fir[k]);
|
||||
}
|
||||
|
||||
Fir(shifted_fir)
|
||||
}
|
||||
|
||||
pub fn lowpass(cutoff: DigitalFrequency, length: usize) -> Fir<Complex<T>>
|
||||
{
|
||||
let mut tf = vec![Complex::<T>::zero(); length];
|
||||
|
||||
let cutoff_bin = map(cutoff.as_rad(), 0., 2. * PI, 0., length as f64).floor() as usize;
|
||||
for i in 0..cutoff_bin
|
||||
{
|
||||
tf[i] = Complex::<T>::one();
|
||||
tf[length - i - 1] = Complex::<T>::one();
|
||||
}
|
||||
|
||||
Self::from_transfer_function(tf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Fir<T>
|
||||
where
|
||||
T: ComplexFloat + Div<T::Real, Output = T> + Copy + Sum,
|
||||
T::Real: Float,
|
||||
{
|
||||
pub fn normalized(mut self) -> Self
|
||||
{
|
||||
let sum: T = self.0.iter().copied().sum();
|
||||
let len = Float::sqrt(sum.im() * sum.im() + sum.re() * sum.re());
|
||||
|
||||
self.0.iter_mut().for_each(|x| *x = *x / len);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FirFilter<F, T, O>
|
||||
where
|
||||
F: Mul<T, Output = O>,
|
||||
O: Sum,
|
||||
{
|
||||
fir: Vec<F>,
|
||||
taps: VecDeque<T>,
|
||||
}
|
||||
|
||||
impl<F, T, O> FirFilter<F, T, O>
|
||||
where
|
||||
T: Clone,
|
||||
F: Mul<T, Output = O> + Clone,
|
||||
O: Sum,
|
||||
{
|
||||
pub fn new(impulse_response: Fir<F>) -> Self
|
||||
{
|
||||
let len = impulse_response.0.len();
|
||||
Self {
|
||||
fir: impulse_response.0,
|
||||
taps: VecDeque::with_capacity(len),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next(&mut self, input: T) -> O
|
||||
{
|
||||
if self.taps.len() == self.fir.len()
|
||||
{
|
||||
self.taps.pop_front();
|
||||
}
|
||||
self.taps.push_back(input);
|
||||
|
||||
self.fir
|
||||
.iter()
|
||||
.zip(self.taps.iter())
|
||||
.map(|(a, b)| a.clone() * b.clone())
|
||||
.sum()
|
||||
}
|
||||
}
|
||||
|
||||
// Completely stolen from sdrpp code
|
||||
pub fn estimate_fir_length(transition_width: f64, sample_rate: f64) -> f64
|
||||
{
|
||||
3.8 * sample_rate / transition_width
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
use num::Float;
|
||||
|
||||
pub mod blocks;
|
||||
pub mod filtering;
|
||||
pub mod synthesis;
|
||||
pub mod units;
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use std::{f64::consts::PI, ops::Neg};
|
||||
use std::f64::consts::PI;
|
||||
use std::ops::Neg;
|
||||
|
||||
use crate::map;
|
||||
|
||||
|
||||
@ -8,6 +8,7 @@ use zyn::ext::TypeExt;
|
||||
use zyn::quote::quote;
|
||||
use zyn::syn::Field;
|
||||
use zyn::syn::GenericParam;
|
||||
use zyn::syn::Lifetime;
|
||||
use zyn::syn::parse_quote;
|
||||
|
||||
use crate::SyncBlockConfig;
|
||||
@ -23,28 +24,41 @@ pub fn sync_block_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) -> z
|
||||
// module to keep everything clean
|
||||
mod {{ item.ident | snake | ident: "{}_synchronous_block" }}
|
||||
{
|
||||
use super::*;
|
||||
|
||||
@sync_block_view_struct(item = item.clone())
|
||||
}
|
||||
|
||||
@sync_block_syncio_impl(item = item.clone(), config = *config)
|
||||
@sync_block_impl_block(item = item.clone())
|
||||
@sync_block_impl_block(item = item.clone(), tagged = config.tagged)
|
||||
)
|
||||
}
|
||||
|
||||
// Block implementation for sync block
|
||||
#[zyn::element]
|
||||
fn sync_block_syncio_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) -> zyn::TokenStream
|
||||
{
|
||||
let view_lifetime: GenericParam = parse_quote!('view);
|
||||
let mut view_generics = item.generics.clone();
|
||||
view_generics.params.iter_mut().for_each(|x| match x
|
||||
{
|
||||
GenericParam::Lifetime(_) =>
|
||||
{}
|
||||
GenericParam::Type(type_param) => type_param
|
||||
.bounds
|
||||
.push(zyn::syn::TypeParamBound::Lifetime(parse_quote!('view))),
|
||||
GenericParam::Const(_) =>
|
||||
{}
|
||||
});
|
||||
view_generics.params.insert(0, view_lifetime);
|
||||
|
||||
let (view_impl_generics, view_type_generics, _view_where_clause) =
|
||||
let (view_impl_generics, view_type_generics, view_where_clause) =
|
||||
view_generics.split_for_impl();
|
||||
let (_impl_generics, type_generics, where_clause) = item.generics.split_for_impl();
|
||||
let (_impl_generics, type_generics, _where_clause) = item.generics.split_for_impl();
|
||||
|
||||
zyn::zyn!(
|
||||
impl {{ view_impl_generics }} oxydsp_flowgraph::block::SyncBlockIO<'view> for {{ item.ident }} {{ type_generics }}
|
||||
{{ where_clause }}
|
||||
{{ view_where_clause }}
|
||||
{
|
||||
// Path within module
|
||||
type StateView = {{ item.ident | snake | ident: "{}_synchronous_block" }}::{{ item.ident | ident:"{}View" }} {{ view_type_generics }};
|
||||
@ -54,6 +68,7 @@ fn sync_block_syncio_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) -
|
||||
)
|
||||
}
|
||||
|
||||
// Input/Output types for block
|
||||
fn sync_block_io_types(
|
||||
item: zyn::syn::ItemStruct,
|
||||
io: &'static str,
|
||||
@ -116,15 +131,26 @@ fn sync_block_io_types(
|
||||
.into_token_stream()
|
||||
}
|
||||
|
||||
// View struct for sync block
|
||||
#[zyn::element]
|
||||
fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream
|
||||
{
|
||||
// Create view liftime to add to struct definition
|
||||
let lifetime: GenericParam = parse_quote!('view);
|
||||
let mut generics = item.generics.clone();
|
||||
generics.params.iter_mut().for_each(|x| match x
|
||||
{
|
||||
GenericParam::Lifetime(_) =>
|
||||
{}
|
||||
GenericParam::Type(type_param) => type_param
|
||||
.bounds
|
||||
.push(zyn::syn::TypeParamBound::Lifetime(parse_quote!('view))),
|
||||
GenericParam::Const(_) =>
|
||||
{}
|
||||
});
|
||||
generics.params.insert(0, lifetime);
|
||||
|
||||
let (_impl_generics, type_generics, where_clause) = generics.split_for_impl();
|
||||
let (impl_generics, _type_generics, where_clause) = generics.split_for_impl();
|
||||
let fields = &item.fields.as_named().unwrap().named;
|
||||
|
||||
let mut state_fields = vec![];
|
||||
@ -169,7 +195,8 @@ fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream
|
||||
.filter(|tokens| !tokens.is_empty());
|
||||
|
||||
zyn::zyn!(
|
||||
pub struct {{ item.ident | ident:"{}View" }} {{ type_generics }}
|
||||
pub struct {{ item.ident | ident:"{}View" }} {{ impl_generics }}
|
||||
{{ where_clause }}
|
||||
{
|
||||
@for (field in state_fields.iter())
|
||||
{
|
||||
@ -185,187 +212,7 @@ fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn sync_block_impl_block(item: zyn::syn::ItemStruct) -> zyn::TokenStream
|
||||
{
|
||||
let item2 = item.clone();
|
||||
let (impl_generics, type_generics, where_clause) = item2.generics.split_for_impl();
|
||||
let fields = &item.fields.as_named().unwrap().named;
|
||||
|
||||
// Retrieve fields
|
||||
let input_fields = fields
|
||||
.iter()
|
||||
.filter(|f| f.attrs.iter().any(|attr| attr.is("input")))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let output_fields = fields
|
||||
.iter()
|
||||
.filter(|f| f.attrs.iter().any(|attr| attr.is("output")))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
zyn::zyn!(
|
||||
impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ item.ident }} {{ type_generics }}
|
||||
{{ where_clause }}
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
|
||||
// Get writers from outputs
|
||||
let mut max_len = usize::MAX;
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write();
|
||||
}
|
||||
|
||||
// Compute max_len
|
||||
let max_len = [
|
||||
usize::MAX,
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len(),
|
||||
}
|
||||
].iter().min().unwrap();
|
||||
|
||||
|
||||
@if (!input_fields.is_empty())
|
||||
{
|
||||
@sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone())
|
||||
}
|
||||
@else
|
||||
{
|
||||
@sync_block_block_impl_without_inputs(item = item.clone(), output_fields = output_fields.clone())
|
||||
}
|
||||
|
||||
oxydsp_flowgraph::block::BlockResult::Ok
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn sync_block_block_impl_without_inputs(
|
||||
item: zyn::syn::ItemStruct,
|
||||
output_fields: Vec<Field>,
|
||||
) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
for _ in 0..max_len
|
||||
{
|
||||
// Get outputs
|
||||
@if (output_fields.len() == 1)
|
||||
{
|
||||
let {{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}
|
||||
}
|
||||
@else
|
||||
{
|
||||
let state = {{ sync_block_make_view_struct(item.clone()) }};
|
||||
let (@for (out_field in output_fields.iter())
|
||||
{
|
||||
{{out_field.ident.clone().unwrap() | ident:"{}_element"}},
|
||||
}
|
||||
)
|
||||
}
|
||||
= <Self as oxydsp_flowgraph::block::SyncBlock>::sync_work(state, ()).unwrap();
|
||||
|
||||
// Now the output samples must be sent to their resepective outputs
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
|
||||
(
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}}, None
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn sync_block_block_impl_with_inputs(
|
||||
item: zyn::syn::ItemStruct,
|
||||
input_fields: Vec<Field>,
|
||||
output_fields: Vec<Field>,
|
||||
) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
|
||||
// Iterate on inputs
|
||||
(
|
||||
@for (in_field in input_fields.iter())
|
||||
{
|
||||
&mut self.{{ in_field.ident }},
|
||||
}
|
||||
).pop_iter()
|
||||
.take(max_len)
|
||||
.for_each(
|
||||
// Deconstruct foreach arguments
|
||||
|
|
||||
(@for (in_field in input_fields.iter())
|
||||
{
|
||||
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}},
|
||||
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}),
|
||||
})
|
||||
|
|
||||
{
|
||||
// Create output tag
|
||||
let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([
|
||||
@for (in_field in input_fields.iter())
|
||||
{
|
||||
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(),
|
||||
}
|
||||
]);
|
||||
|
||||
let state = {{ sync_block_make_view_struct(item.clone()) }};
|
||||
// Compute output sample
|
||||
@if (output_fields.is_empty())
|
||||
{
|
||||
let _
|
||||
}
|
||||
@else if (output_fields.len() == 1)
|
||||
{
|
||||
let oxydsp_flowgraph::tag::Tagged({{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{output_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}})
|
||||
}
|
||||
@else
|
||||
{
|
||||
let (@for (out_field in output_fields.iter())
|
||||
{
|
||||
oxydsp_flowgraph::tag::Tagged({{out_field.ident.clone().unwrap() | ident:"{}_element"}}, {{out_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}),
|
||||
}
|
||||
)
|
||||
}
|
||||
= <Self as oxydsp_flowgraph::block::SyncBlock>::sync_work(state,
|
||||
@if (input_fields.len() == 1)
|
||||
{
|
||||
oxydsp_flowgraph::tag::Tagged({{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{input_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone())
|
||||
}
|
||||
@else
|
||||
{
|
||||
(@for (in_field in input_fields.iter())
|
||||
{
|
||||
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()),
|
||||
}
|
||||
)
|
||||
}
|
||||
).unwrap();
|
||||
|
||||
// Now the output samples must be sent to their resepective outputs
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
|
||||
oxydsp_flowgraph::tag::Tagged(
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}.merge(common_tag),
|
||||
)
|
||||
);
|
||||
}
|
||||
//
|
||||
|
||||
}
|
||||
);
|
||||
)
|
||||
}
|
||||
|
||||
// Instantiates the view struct
|
||||
fn sync_block_make_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream
|
||||
{
|
||||
let fields = &item.fields.as_named().unwrap().named;
|
||||
@ -395,3 +242,282 @@ fn sync_block_make_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream
|
||||
)
|
||||
.into_token_stream()
|
||||
}
|
||||
|
||||
// Impl Block for syncio
|
||||
#[zyn::element]
|
||||
fn sync_block_impl_block(item: zyn::syn::ItemStruct, tagged: bool) -> zyn::TokenStream
|
||||
{
|
||||
let item2 = item.clone();
|
||||
let (impl_generics, type_generics, where_clause) = item2.generics.split_for_impl();
|
||||
let fields = &item.fields.as_named().unwrap().named;
|
||||
|
||||
// Retrieve fields
|
||||
let input_fields = fields
|
||||
.iter()
|
||||
.filter(|f| f.attrs.iter().any(|attr| attr.is("input")))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
let output_fields = fields
|
||||
.iter()
|
||||
.filter(|f| f.attrs.iter().any(|attr| attr.is("output")))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
zyn::zyn!(
|
||||
impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ item.ident }} {{ type_generics }}
|
||||
{{ where_clause }}
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
use oxydsp_flowgraph::tag::TagMergable;
|
||||
// Get writers from outputs
|
||||
let mut max_len = usize::MAX;
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write();
|
||||
}
|
||||
|
||||
// Compute max_len
|
||||
let max_len = *([
|
||||
usize::MAX,
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len(),
|
||||
}
|
||||
].iter().min().unwrap());
|
||||
|
||||
|
||||
@if (!input_fields.is_empty())
|
||||
{
|
||||
@sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone(), tagged = *tagged)
|
||||
}
|
||||
@else
|
||||
{
|
||||
@sync_block_block_impl_without_inputs(item = item.clone(), output_fields = output_fields.clone(), tagged = *tagged)
|
||||
}
|
||||
|
||||
oxydsp_flowgraph::block::BlockResult::Ok
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
// Impl Block for syncio (no inputs)
|
||||
#[zyn::element]
|
||||
fn sync_block_block_impl_without_inputs(
|
||||
item: zyn::syn::ItemStruct,
|
||||
output_fields: Vec<Field>,
|
||||
tagged: bool,
|
||||
) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
for _ in 0..max_len
|
||||
{
|
||||
// Get outputs
|
||||
let state = {{ sync_block_make_view_struct(item.clone()) }};
|
||||
let @sync_block_output_descons(output_fields = output_fields.clone(), tagged = *tagged) =
|
||||
<Self as oxydsp_flowgraph::block::SyncBlock>::sync_work(state, ()).unwrap();
|
||||
|
||||
// Now the output samples must be sent to their resepective outputs
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
@if (*tagged)
|
||||
{
|
||||
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
|
||||
oxydsp_flowgraph::tag::Tagged(
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}},
|
||||
)
|
||||
);
|
||||
} @else
|
||||
{
|
||||
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
|
||||
oxydsp_flowgraph::tag::Tagged(
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
|
||||
None,
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
// Impl Block for syncio (with inputs)
|
||||
#[zyn::element]
|
||||
fn sync_block_block_impl_with_inputs(
|
||||
item: zyn::syn::ItemStruct,
|
||||
input_fields: Vec<Field>,
|
||||
output_fields: Vec<Field>,
|
||||
tagged: bool,
|
||||
) -> zyn::TokenStream
|
||||
{
|
||||
zyn::zyn!(
|
||||
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
// Iterate on inputs
|
||||
(
|
||||
@for (in_field in input_fields.iter())
|
||||
{
|
||||
&mut self.{{ in_field.ident }},
|
||||
}
|
||||
).pop_iter()
|
||||
.take(max_len)
|
||||
.for_each(
|
||||
// Deconstruct foreach arguments
|
||||
|
|
||||
(@for (in_field in input_fields.iter())
|
||||
{
|
||||
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}},
|
||||
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}),
|
||||
})
|
||||
|
|
||||
{
|
||||
// Create output tag
|
||||
let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([
|
||||
@for (in_field in input_fields.iter())
|
||||
{
|
||||
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(),
|
||||
}
|
||||
]);
|
||||
|
||||
let state = {{ sync_block_make_view_struct(item.clone()) }};
|
||||
// Compute output sample
|
||||
let @sync_block_output_descons(output_fields = output_fields.clone(), tagged = *tagged)
|
||||
= <Self as oxydsp_flowgraph::block::SyncBlock>::sync_work(state, @sync_block_input_cons(input_fields = input_fields.clone(), tagged = *tagged)).unwrap();
|
||||
|
||||
// Now the output samples must be sent to their resepective outputs
|
||||
@for (out_field in output_fields.iter())
|
||||
{
|
||||
@if (*tagged)
|
||||
{
|
||||
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
|
||||
oxydsp_flowgraph::tag::Tagged(
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}.merge(&common_tag),
|
||||
)
|
||||
);
|
||||
} @else
|
||||
{
|
||||
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
|
||||
oxydsp_flowgraph::tag::Tagged(
|
||||
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
|
||||
common_tag,
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
//
|
||||
|
||||
}
|
||||
);
|
||||
)
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn sync_block_output_descons(output_fields: Vec<Field>, tagged: bool) -> zyn::TokenStream
|
||||
{
|
||||
#[allow(clippy::collapsible_else_if)]
|
||||
if *tagged
|
||||
{
|
||||
// If tagged : deconstruct tags
|
||||
if output_fields.is_empty()
|
||||
{
|
||||
zyn::zyn!(_)
|
||||
}
|
||||
else if output_fields.len() == 1
|
||||
{
|
||||
zyn::zyn!(
|
||||
oxydsp_flowgraph::tag::Tagged({{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{output_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}})
|
||||
)
|
||||
}
|
||||
else
|
||||
{
|
||||
zyn::zyn!(
|
||||
(@for (in_field in output_fields.iter())
|
||||
{
|
||||
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()),
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Otherwise just get the output element
|
||||
if output_fields.is_empty()
|
||||
{
|
||||
zyn::zyn!(_)
|
||||
}
|
||||
else if output_fields.len() == 1
|
||||
{
|
||||
zyn::zyn!(
|
||||
{{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}
|
||||
)
|
||||
}
|
||||
else
|
||||
{
|
||||
zyn::zyn!(
|
||||
(@for (in_field in output_fields.iter())
|
||||
{
|
||||
{{in_field.ident.clone().unwrap() | ident:"{}_element"}},
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[zyn::element]
|
||||
fn sync_block_input_cons(input_fields: Vec<Field>, tagged: bool) -> zyn::TokenStream
|
||||
{
|
||||
#[allow(clippy::collapsible_else_if)]
|
||||
if *tagged
|
||||
{
|
||||
// If tagged : deconstruct tags
|
||||
if input_fields.is_empty()
|
||||
{
|
||||
zyn::zyn!(())
|
||||
}
|
||||
else if input_fields.len() == 1
|
||||
{
|
||||
zyn::zyn!(
|
||||
oxydsp_flowgraph::tag::Tagged({{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{input_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}})
|
||||
)
|
||||
}
|
||||
else
|
||||
{
|
||||
zyn::zyn!(
|
||||
(@for (in_field in input_fields.iter())
|
||||
{
|
||||
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()),
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Otherwise just get the output element
|
||||
if input_fields.is_empty()
|
||||
{
|
||||
zyn::zyn!(_)
|
||||
}
|
||||
else if input_fields.len() == 1
|
||||
{
|
||||
zyn::zyn!(
|
||||
{{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}
|
||||
)
|
||||
}
|
||||
else
|
||||
{
|
||||
zyn::zyn!(
|
||||
(@for (in_field in input_fields.iter())
|
||||
{
|
||||
{{in_field.ident.clone().unwrap() | ident:"{}_element"}},
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -9,8 +9,10 @@ pub enum BlockResult
|
||||
|
||||
// Signifies that the block finished its work
|
||||
// Running it again would be useless
|
||||
// This triggers the graph shutdown
|
||||
Terminated,
|
||||
|
||||
// Kill graph
|
||||
Exit,
|
||||
}
|
||||
|
||||
pub trait BlockIO
|
||||
|
||||
5
oxydsp-flowgraph/src/event.rs
Normal file
5
oxydsp-flowgraph/src/event.rs
Normal file
@ -0,0 +1,5 @@
|
||||
// Represents a FlowGrahWide, simultaneous event
|
||||
pub enum FlowGraphEvent
|
||||
{
|
||||
Kill(String),
|
||||
}
|
||||
@ -49,20 +49,15 @@ impl FlowGraph
|
||||
crate::block::BlockResult::Ok =>
|
||||
{}
|
||||
crate::block::BlockResult::Terminated =>
|
||||
{ //break 'outer;
|
||||
}
|
||||
crate::block::BlockResult::Exit =>
|
||||
{
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _ in 0..10_000
|
||||
{
|
||||
for x in self.blocks.iter_mut()
|
||||
{
|
||||
x.work();
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -131,7 +131,7 @@ impl<T: 'static> Out<T>
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_iter<I: Iterator<Item = (T, Option<Tag>)>>(&mut self, mut iter: I) -> bool
|
||||
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
|
||||
{
|
||||
let writer = self.write();
|
||||
let len = writer.len();
|
||||
@ -140,17 +140,7 @@ impl<T: 'static> Out<T>
|
||||
{
|
||||
if let Some(elt) = iter.next()
|
||||
{
|
||||
match elt.1
|
||||
{
|
||||
Some(tag) =>
|
||||
{
|
||||
let _ = writer.push(Tagged(elt.0, Some(tag)));
|
||||
}
|
||||
None =>
|
||||
{
|
||||
let _ = writer.push_no_tag(elt.0);
|
||||
}
|
||||
}
|
||||
let _ = writer.push(elt);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -220,7 +210,13 @@ impl<T> OutWriter<'_, T>
|
||||
|
||||
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
|
||||
{
|
||||
let (data, tag) = data.into();
|
||||
let (data, mut tag) = data.into();
|
||||
let position = self.data_writer.next_index();
|
||||
if let Some(tag) = &mut tag
|
||||
{
|
||||
tag.position = position
|
||||
}
|
||||
|
||||
match self.data_writer.push(data)
|
||||
{
|
||||
Ok(_) if tag.is_some() =>
|
||||
|
||||
@ -3,8 +3,10 @@
|
||||
|
||||
pub mod block;
|
||||
pub mod edge;
|
||||
pub mod event;
|
||||
pub mod graph;
|
||||
pub mod io;
|
||||
pub mod stream;
|
||||
pub mod tag;
|
||||
pub use oxydsp_flowgraph_macros::{BlockIO, sync_block};
|
||||
pub use oxydsp_flowgraph_macros::BlockIO;
|
||||
pub use oxydsp_flowgraph_macros::sync_block;
|
||||
|
||||
@ -22,6 +22,14 @@ pub struct Tag
|
||||
|
||||
impl Tag
|
||||
{
|
||||
pub fn new() -> Self
|
||||
{
|
||||
Self {
|
||||
position: 0,
|
||||
data: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge_tag_opts<const N: usize>(tag_opts: [Option<Tag>; N]) -> Option<Tag>
|
||||
{
|
||||
let mut out_tag = None;
|
||||
@ -31,6 +39,27 @@ impl Tag
|
||||
}
|
||||
out_tag
|
||||
}
|
||||
|
||||
pub fn tag<T: 'static + Send + Sync>(&self, key: impl AsRef<str>, value: T)
|
||||
{
|
||||
self.data
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(key.as_ref().to_owned(), Arc::new(value));
|
||||
}
|
||||
|
||||
pub fn retrieve(&self, key: impl AsRef<str>) -> Option<Arc<dyn Any + Send + Sync>>
|
||||
{
|
||||
self.data.lock().unwrap().get(key.as_ref()).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Tag
|
||||
{
|
||||
fn default() -> Self
|
||||
{
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TagValue: Clone {}
|
||||
@ -93,6 +122,10 @@ impl<T> Tagged<T>
|
||||
{
|
||||
pub fn new(inner: T, tag: Option<Tag>) -> Self
|
||||
{
|
||||
if tag.is_none()
|
||||
{
|
||||
//println!("data has no tag");
|
||||
}
|
||||
Self(inner, tag)
|
||||
}
|
||||
|
||||
@ -156,11 +189,11 @@ impl<T> From<(T, Option<Tag>)> for Tagged<T>
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Into<(T, Option<Tag>)> for Tagged<T>
|
||||
impl<T> From<Tagged<T>> for (T, Option<Tag>)
|
||||
{
|
||||
fn into(self) -> (T, Option<Tag>)
|
||||
fn from(val: Tagged<T>) -> Self
|
||||
{
|
||||
(self.0, self.1)
|
||||
(val.0, val.1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user