Stream rework, qpsk example, splitter/merger

This commit is contained in:
2026-04-12 00:46:41 +02:00
parent 87921968b4
commit dad09cddcf
17 changed files with 474 additions and 142 deletions

View File

@ -4,6 +4,7 @@ use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::tag::Tag;
use std::iter::Sum;
use std::ops::Add;
use std::ops::Mul;
@ -11,7 +12,7 @@ use std::ops::Mul;
use crate::filtering::fir::Fir;
#[derive(BlockIO)]
pub struct FirFilter<F, T, O>
pub struct FirFilter<F, T, O, const D: usize = 1>
where
T: Clone + Zero + 'static,
F: Mul<T, Output = O> + Clone + 'static,
@ -26,7 +27,7 @@ where
filter: crate::filtering::fir::FirFilter<F, T, O>,
}
impl<F, T, O> FirFilter<F, T, O>
impl<F, T, O> FirFilter<F, T, O, 1>
where
T: Clone + Zero + 'static,
F: Mul<T, Output = O> + Clone + 'static,
@ -34,6 +35,19 @@ where
{
pub fn new(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
{
Self::new_decimating(input, impulse_response)
}
}
impl<F, T, O, const D: usize> FirFilter<F, T, O, D>
where
T: Clone + Zero + 'static,
F: Mul<T, Output = O> + Clone + 'static,
O: Add<O, Output = O> + Sum + Clone + Zero,
{
pub fn new_decimating(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
{
const { assert!(D != 0); };
let (output, filtered) = oxydsp_flowgraph::io::stream();
(
Self {
@ -46,15 +60,35 @@ where
}
}
impl<F, T, O> Block for FirFilter<F, T, O>
impl<F, T, O, const D: usize> Block for FirFilter<F, T, O, D>
where
T: Clone + Zero,
F: Mul<T, Output = O> + Clone + 'static,
O: Add<O, Output = O> + Sum + Clone + Zero,
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into()));
if D > 1
{
let mut input_iter = self.input.iter();
let mut output_pusher = self.output.write_push();
while input_iter.len() >= D && output_pusher.len() > 0
{
// TODO: Maybe find a better way to do this.
// I hope this is at least well optimized by the compilator
let batch: [_; D] = std::array::from_fn(|_| input_iter.next().unwrap());
let tag_batch: [_; D] = std::array::from_fn(|i| &batch[i].1);
let data_batch: [_; D] = std::array::from_fn(|i| batch[i].0.clone());
self.filter.insert_batch_ref(&data_batch);
let _ = output_pusher
.push((self.filter.filtered(), Tag::from_tag_opts(tag_batch.into_iter())).into());
}
}
else if D == 1
{
self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into()));
}
BlockResult::Ok
}
}

View File

@ -58,7 +58,7 @@ where
.iter()
.zip(self.input_b.iter())
.map(|(x, y)| {
(x.0 + y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into()
(x.0 + y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into()
})
);
BlockResult::Ok
@ -115,7 +115,7 @@ where
.iter()
.zip(self.input_b.iter())
.map(|(x, y)| {
(x.0 * y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into()
(x.0 * y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into()
})
);
BlockResult::Ok

View File

@ -1,4 +1,5 @@
use std::iter::FusedIterator;
use std::mem::MaybeUninit;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
@ -24,7 +25,7 @@ pub struct Map<I: 'static, O: 'static, F>
impl<I: 'static, O: 'static, F> Map<I, O, F>
where
F: Fn(I) -> O,
F: FnMut(I) -> O,
{
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
{
@ -35,15 +36,12 @@ where
impl<I: 'static, O: 'static, F> Block for Map<I, O, F>
where
F: Fn(I) -> O,
F: FnMut(I) -> O,
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(
self.input
.iter()
.map(|x| ((&self.map)(x.0), x.1).into()),
);
self.output
.push_iter(self.input.iter().map(|x| ((&mut self.map)(x.0), x.1).into()));
BlockResult::Ok
}
}
@ -155,7 +153,7 @@ where
#[derive(BlockIO)]
pub struct Scan<I: 'static, O: 'static, S, F>
where
F: Fn(&mut S, I) -> O,
F: FnMut(&mut S, I) -> O,
{
#[input]
input: In<I>,
@ -170,7 +168,7 @@ where
impl<I: 'static, O: 'static, S, F> Scan<I, O, S, F>
where
F: Fn(&mut S, I) -> O,
F: FnMut(&mut S, I) -> O,
{
pub fn new(input: In<I>, initial_state: S, map: F) -> (Self, In<O>)
{
@ -191,7 +189,7 @@ impl<I, O, S, F> Block for Scan<I, O, S, F>
where
I: 'static,
O: 'static,
F: Fn(&mut S, I) -> O,
F: FnMut(&mut S, I) -> O,
{
fn work(&mut self) -> BlockResult
{
@ -250,7 +248,7 @@ where
self.output.push_iter(self.input.iter().map(|tagged| {
let cloned_tag = tagged.1.clone();
let (output, tag) = (self.map)(&mut self.state, (tagged.0, cloned_tag).into()).into();
(output, Tag::from_tag_opts(&[&tagged.1, &tag])).into()
(output, Tag::from_tag_opts([&tagged.1, &tag].into_iter())).into()
}));
BlockResult::Ok
}
@ -499,3 +497,122 @@ where
BlockResult::Ok
}
}
#[derive(BlockIO)]
pub struct Splitter<T: 'static + Clone, const N: usize>
{
#[input]
input: In<T>,
#[output]
outputs: [Out<T>; N],
}
impl<T: 'static + Clone, const N: usize> Splitter<T, N>
{
pub fn new(input: In<T>) -> (Self, [In<T>; N])
{
const { assert!(N > 0) }
let (outs, ins) = oxydsp_flowgraph::io::streams();
(
Self {
input,
outputs: outs,
},
ins,
)
}
}
impl<T: 'static + Clone, const N: usize> Block for Splitter<T, N>
{
fn work(&mut self) -> BlockResult
{
let mut input_iter = self.input.iter();
let mut outputs = self
.outputs
.iter_mut()
.map(|x| x.write_push())
.collect::<Vec<_>>();
let length = input_iter
.len()
.min(outputs.iter().map(|x| x.len()).min().unwrap());
for _ in 0..length
{
let pulled = input_iter.next().unwrap();
outputs.iter_mut().for_each(|x| {
let _ = x.push(pulled.clone());
});
}
BlockResult::Ok
}
}
#[derive(BlockIO)]
pub struct Merger<T: 'static, const N: usize>
{
#[input]
inputs: [In<T>; N],
#[output]
output: Out<[T; N]>,
}
impl<T: 'static, const N: usize> Merger<T, N>
{
pub fn new(inputs: [In<T>; N]) -> (Self, In<[T; N]>)
{
const { assert!(N > 0) }
let (out, inp) = oxydsp_flowgraph::io::stream();
(
Self {
inputs,
output: out,
},
inp,
)
}
}
impl<T: Sized + 'static, const N: usize> Block for Merger<T, N>
{
fn work(&mut self) -> BlockResult
{
let mut inputs = self.inputs.iter_mut().map(|x| x.iter()).collect::<Vec<_>>();
let mut output = self.output.write_push();
let len = inputs
.len()
.min(inputs.iter().map(|x| x.len()).min().unwrap());
let mut datas: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit());
let mut tags: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit());
for _ in 0..len
{
for (i, (data, tag)) in inputs
.iter_mut()
.map(|x| x.next().unwrap().into())
.enumerate()
{
datas[i] = MaybeUninit::new(data);
tags[i] = MaybeUninit::new(tag);
}
let ok_datas: [_; N] = unsafe {
std::array::from_fn(|i| std::mem::replace(&mut datas[i], MaybeUninit::uninit()).assume_init())
};
let ok_tags = unsafe {
std::mem::transmute::<&[MaybeUninit<Option<Tag>>; N], &[Option<Tag>; N]>(&tags)
};
let tag = Tag::from_tag_opts(ok_tags.into_iter());
output.push((ok_datas, tag).into());
}
BlockResult::Ok
}
}

View File

@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
@ -44,12 +45,11 @@ impl<Tx, I: 'static> TxSink<Tx, I>
}
}
impl<I: 'static> Block for RxSource<Receiver<I>, I>
impl<I: 'static + Debug> Block for RxSource<Receiver<I>, I>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self
.output
self.output
.push_iter(self.input.try_iter().map(|x| (x, None).into()));
BlockResult::Ok
}

View File

@ -1,6 +1,7 @@
use num::Complex;
use num::Float;
use num::FromPrimitive;
use num::Num;
use num::One;
use num::Zero;
use num::complex::ComplexFloat;
@ -16,7 +17,6 @@ use crate::filtering::history_buf::HistoryBuf;
use crate::map;
use crate::units::DigitalFrequency;
/// Represents a finite impulse response as a vector
/// of values in time.
///
@ -26,8 +26,33 @@ use crate::units::DigitalFrequency;
///
/// For a reverb ir for example the clap would be at index 0
/// and the reverb tail towards the end of the vector.
#[derive(Clone)]
pub struct Fir<T>(pub Vec<T>);
impl<T> Fir<T>
where
T: Num + Clone + Sum,
{
pub fn convoluted_with(&self, other: &Fir<T>) -> Fir<T>
{
// Perform convolution
let mut new_fir = vec![];
let mut filter = FirFilter::<T, T, T>::new(self.clone());
for x in self.0.iter().rev()
{
new_fir.push(filter.next(x.clone()));
}
for _ in 0..other.0.len()
{
new_fir.push(filter.next(T::zero()));
}
Fir(new_fir)
}
}
impl<T> Fir<Complex<T>>
where
T: FftNum + Float + Clone,
@ -205,6 +230,32 @@ where
.collect(),
)
}
/// Creates a gaussion fir of length `length`
/// The maximum amplitude of the fir is 1.
/// The gaussian curve is centered in the fir.
/// The parameter `standard_deviations` specifies how many multiples of the
/// standard deviations is on the left and right of the center before the fir cuts it off.
pub fn gaussian(length: usize, standard_deviations: f32) -> Self
{
Self(
(0..length)
.map(|x| {
let t = map(
x as f64,
0.,
length as f64,
-standard_deviations as f64,
standard_deviations as f64,
);
// Gaussian with sd=1
let sq = t / 2.;
T::from_f64(-sq * sq).unwrap().exp()
})
.collect(),
)
}
}
/// A simple convolutional finite impulse response filter
@ -215,7 +266,7 @@ where
{
fir: Vec<F>,
//taps: VecDeque<T>,
taps: HistoryBuf<T>
taps: HistoryBuf<T>,
}
impl<F, T, O> FirFilter<F, T, O>
@ -234,14 +285,34 @@ where
}
}
/// Gets the next output given an input sample.
/// Inserts a new sample in the filter and get its new value
///
/// At the beginning, the delay line starts with zeroes.
pub fn next(&mut self, input: T) -> O
{
self.insert(input);
self.filtered()
}
pub fn insert(&mut self, input: T)
{
self.taps.push(input);
}
pub fn insert_batch_ref(&mut self, input: &[T])
{
for x in input.iter().cloned()
{
self.taps.push(x);
}
}
/// Gets the current value of the filter
/// given the sampels that it currently holds
pub fn filtered(&self) -> O
{
let taps = self.taps.as_slice();
Self::dot_prod(&self.fir, taps)
Self::dot_prod(&self.fir, taps)
}
pub fn dot_prod(a: &[F], b: &[T]) -> O
@ -254,12 +325,11 @@ where
let (b_chunks, b_remainder) = b.as_chunks::<4>();
for (x, y) in a_chunks.iter().zip(b_chunks.iter())
{
sum[0] = sum[0].clone() + x[0].clone() * y[0].clone();
sum[1] = sum[1].clone() + x[1].clone() * y[1].clone();
sum[2] = sum[2].clone() + x[2].clone() * y[2].clone();
sum[3] = sum[3].clone() + x[3].clone() * y[3].clone();
sum[0] = sum[0].clone() + x[0].clone() * y[0].clone();
sum[1] = sum[1].clone() + x[1].clone() * y[1].clone();
sum[2] = sum[2].clone() + x[2].clone() * y[2].clone();
sum[3] = sum[3].clone() + x[3].clone() * y[3].clone();
}
let mut sum = sum[0].clone() + sum[1].clone() + sum[2].clone() + sum[3].clone();