Compare commits

...

5 Commits

42 changed files with 2611 additions and 1150 deletions

74
Cargo.lock generated
View File

@ -823,6 +823,25 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -2501,12 +2520,14 @@ dependencies = [
"num",
"oxydsp-flowgraph",
"rustfft",
"wide",
]
[[package]]
name = "oxydsp-flowgraph"
version = "0.1.0"
dependencies = [
"crossbeam-deque",
"oxydsp-flowgraph-macros",
]
@ -2713,6 +2734,22 @@ version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a041e753da8b807c9255f28de81879c78c876392ff2469cde94799b2896b9d"
[[package]]
name = "qpsk-modem"
version = "0.1.0"
dependencies = [
"cpal",
"eframe",
"egui",
"egui_plot",
"hound",
"num",
"oxydsp-dsp",
"oxydsp-flowgraph",
"rand",
"rand_distr",
]
[[package]]
name = "quick-error"
version = "2.0.1"
@ -2776,6 +2813,16 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba"
[[package]]
name = "rand_distr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d431c2703ccf129de4d45253c03f49ebb22b97d6ad79ee3ecfc7e3f4862c1d8"
dependencies = [
"num-traits",
"rand",
]
[[package]]
name = "range-alloc"
version = "0.1.5"
@ -2888,6 +2935,15 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "safe_arch"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f7caad094bd561859bcd467734a720c3c1f5d1f338995351fefe2190c45efed"
dependencies = [
"bytemuck",
]
[[package]]
name = "same-file"
version = "1.0.6"
@ -3020,6 +3076,14 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
[[package]]
name = "simple"
version = "0.1.0"
dependencies = [
"oxydsp-dsp",
"oxydsp-flowgraph",
]
[[package]]
name = "slab"
version = "0.4.12"
@ -3885,6 +3949,16 @@ dependencies = [
"web-sys",
]
[[package]]
name = "wide"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "198f6abc41fab83526d10880fa5c17e2b4ee44e763949b4bb34e2fd1e8ca48e4"
dependencies = [
"bytemuck",
"safe_arch",
]
[[package]]
name = "winapi-util"
version = "0.1.11"

View File

@ -5,3 +5,8 @@ members = [
"oxydsp-dsp",
"oxydsp-flowgraph"
]
[profile.release-with-debug]
inherits = "release"
debug = true

View File

@ -8,7 +8,6 @@ use oxydsp_dsp::blocks::synthesis::Nco;
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
use oxydsp_dsp::blocks::utilities::adapters::FlatMap;
use oxydsp_dsp::blocks::utilities::adapters::Map;
use oxydsp_dsp::blocks::utilities::adapters::Scan;
use oxydsp_dsp::blocks::utilities::channels::RxSource;
use oxydsp_dsp::blocks::utilities::channels::TxSink;
use oxydsp_dsp::filtering::fir::Fir;
@ -17,20 +16,17 @@ use oxydsp_flowgraph::flowgraph;
use oxydsp_flowgraph::io::In;
use rand::random;
use std::f32::consts::PI;
use std::net::UdpSocket;
use std::ops::BitXor;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::SyncSender;
use std::sync::mpsc::sync_channel;
use std::thread::JoinHandle;
use std::time::Duration;
use crate::CARRIER;
use crate::DEVIATION;
use crate::SAMPLE_PER_SYMBOL;
use crate::SAMPLE_RATE;
use crate::gaussian;
use crate::to_bits;
pub struct Transmitter

View File

@ -40,6 +40,7 @@ use oxydsp_dsp::filtering::fir::Fir;
use oxydsp_dsp::units::DigitalFrequency;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::flowgraph;
use oxydsp_flowgraph::io::AnonymousIn;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::tag::Tags;
use rand::random;

View File

@ -0,0 +1,16 @@
[package]
name = "qpsk-modem"
version = "0.1.0"
edition = "2024"
[dependencies]
oxydsp-flowgraph = {path = "../../oxydsp-flowgraph/"}
oxydsp-dsp = {path = "../../oxydsp-dsp/"}
egui = "0.33.3"
egui_plot = "0.34.1"
eframe = { version = "0.33.3", features = ["default_fonts", "wayland"] }
num = "0.4.3"
hound = "3.5.1"
rand = "0.10.0"
cpal = "0.17.3"
rand_distr = "0.6.0"

BIN
examples/qpsk-modem/mod.wav Normal file

Binary file not shown.

View File

@ -0,0 +1,431 @@
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use egui::Color32;
use egui_plot::Line;
use egui_plot::PlotPoints;
use egui_plot::Points;
use num::Complex;
use num::complex::ComplexFloat;
use oxydsp_dsp::blocks::filtering::fir::FirFilter;
use oxydsp_dsp::blocks::filtering::pulse_shaping::PulseShaper;
use oxydsp_dsp::blocks::math::basic::Multiplier;
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate;
use oxydsp_dsp::blocks::utilities::adapters::Map;
use oxydsp_dsp::blocks::utilities::adapters::Merger;
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
use oxydsp_dsp::blocks::utilities::adapters::Scan;
use oxydsp_dsp::blocks::utilities::adapters::ScanTagged;
use oxydsp_dsp::blocks::utilities::adapters::Splitter;
use oxydsp_dsp::blocks::utilities::channels::RxSource;
use oxydsp_dsp::blocks::utilities::channels::TxSink;
use oxydsp_dsp::blocks::utilities::graph_control::GraphKiller;
use oxydsp_dsp::blocks::utilities::iter::IterSource;
use oxydsp_dsp::filtering::fir::Fir;
use oxydsp_dsp::filtering::history_buf::HistoryBuf;
use oxydsp_dsp::map;
use oxydsp_dsp::units::DigitalFrequency;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::flowgraph;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::tag::Tagged;
use oxydsp_flowgraph::tag::Tags;
use rand::random;
const CARRRIER_FREQ: f64 = 1000.;
const SAMPLE_RATE: usize = 48_000;
const SAMPLE_PER_SYMBOL: usize = 100;
#[derive(BlockIO)]
pub struct CostasLoop
{
#[input]
input: In<f32>,
#[output]
output: Out<Complex<f32>>,
center_freq: DigitalFrequency,
nco: oxydsp_dsp::synthesis::oscillator::Nco<f32>,
loop_filter: oxydsp_dsp::filtering::fir::FirFilter<f32, f32, f32>,
low_pass: oxydsp_dsp::filtering::fir::FirFilter<Complex<f32>, Complex<f32>, Complex<f32>>,
}
impl CostasLoop
{
pub fn new(
input: In<f32>,
start_frequency: DigitalFrequency,
loop_filter: Fir<f32>,
) -> (Self, In<Complex<f32>>)
{
let (output, iq) = oxydsp_flowgraph::io::stream();
(
CostasLoop {
input,
output,
center_freq: start_frequency,
nco: start_frequency.into(),
loop_filter: oxydsp_dsp::filtering::fir::FirFilter::new(loop_filter),
low_pass: oxydsp_dsp::filtering::fir::FirFilter::new(
Fir::lowpass(start_frequency, 100).normalized_len(),
),
},
iq,
)
}
}
impl Block for CostasLoop
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(self.input.iter().map(|x| {
let (signal, tag) = x.into();
let lo = self.nco.next().unwrap();
let iq = self
.low_pass
.next(Complex::new(lo.re * signal, lo.im * signal));
let error = iq.re * iq.im.signum() - iq.im * iq.re.signum();
let correction = self.loop_filter.next(error);
self.nco.set_frequency(DigitalFrequency::from_rad(
self.center_freq.as_rad() + correction as f64,
));
(iq, tag).into()
}));
oxydsp_flowgraph::block::BlockResult::Ok
}
}
pub fn main()
{
demodulator();
//modulator();
}
pub fn modulator()
{
let bit_source = (0..8192).map(|_| [random::<bool>(), random::<bool>()]);
let mut tags = Tags::default();
let (mut iter_source, bits) = IterSource::new(bit_source);
let kill_tag = tags.allocate_tag("Kill tag");
iter_source.tag_last_with(kill_tag.clone());
let (to_iq, iq) = Map::new(bits, |x| match x
{
[true, true] => Complex::new(1., 1.),
[true, false] => Complex::new(1., -1.),
[false, true] => Complex::new(-1., 1.),
[false, false] => Complex::new(-1., -1.),
});
// let (pulse_shaper, iq) = PulseShaper::new(
// iq,
// Fir::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 1., SAMPLE_PER_SYMBOL),
// SAMPLE_PER_SYMBOL,
// );
let (pulse_shaper, iq) = PulseShaper::new(
iq,
Fir::gaussian(SAMPLE_PER_SYMBOL, 3.),
SAMPLE_PER_SYMBOL,
);
let (carrier_oscillator, carrier) = OscillatorSource::new(
DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into(),
);
let (mixer, passband) = Multiplier::new(iq, carrier);
let (tx, rx) = std::sync::mpsc::channel::<Complex<f32>>();
let (graph_killer, passband) = GraphKiller::new(passband, kill_tag);
let tx_sink = TxSink::new(passband, tx);
let graph = flowgraph![
iter_source,
to_iq,
pulse_shaper,
carrier_oscillator,
mixer,
graph_killer,
tx_sink
];
graph.run(1);
let spec = hound::WavSpec {
channels: 1,
sample_rate: SAMPLE_RATE as u32,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap();
for iq in rx.iter()
{
let amplitude = 0.5 * i16::MAX as f32;
writer.write_sample((iq.re * amplitude) as i16).unwrap();
}
writer.finalize().unwrap();
}
pub fn demodulator()
{
let (audio_tx, audio_rx) = std::sync::mpsc::channel();
let (rx_source, signal) = RxSource::new(audio_rx);
let (downconverter, iq) = CostasLoop::new(
signal,
DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64),
Fir::proportional_integral(100, 0.01, 0.00005),
);
//let mut agc_filter = oxydsp_dsp::filtering::fir::FirFilter::new(Fir::proportional_integral(100, 0.1, 0.001));
let (agc_error_tx, agc_error_rx) = std::sync::mpsc::channel();
let (agc, iq) = Scan::new(iq, (1., 0.), move |(gain, error_low), iq| {
let out = *gain * iq;
let error = 1. - out.abs();
let alpha = 0.01;
*error_low = (1. - alpha) * *error_low + alpha * error;
let _ = agc_error_tx.send(*error_low);
*gain += 0.01 * error; // Feedback
out
});
const DECIMATION: usize = 1;
// let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating(
// iq,
// Fir::<Complex<f32>>::lowpass(DigitalFrequency::from_time_frequency(2000., SAMPLE_RATE as f64), 100)
// .normalized_len().convoluted_with(&Fir::<Complex<f32>>::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 1., SAMPLE_PER_SYMBOL)
// .normalized_sqr())
// );
// let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating(
// iq,
// Fir::<Complex<f32>>::lowpass(DigitalFrequency::from_time_frequency(2000., SAMPLE_RATE as f64), 100)
// .normalized_len().convoluted_with(&Fir::<Complex<f32>>::gaussian(SAMPLE_PER_SYMBOL, 3.).normalized_sqr())
// );
let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating(
iq,
Fir::<Complex<f32>>::gaussian(SAMPLE_PER_SYMBOL, 3.).normalized_sqr()
);
let (splitter, [iq_i, iq_q]) = Splitter::new(iq);
let (proj_i, i) = Map::new(iq_i, |x| x.re);
let (proj_q, q) = Map::new(iq_q, |x| x.im);
let mut tags = Tags::default();
let elg_filter = Fir::proportional_integral(100, 0.2, 0.002);
let i_key = tags.allocate_tag("i tag");
let q_key = tags.allocate_tag("q tag");
let (elg_i, i) = EarlyLateGate::new(i, elg_filter.clone(), SAMPLE_PER_SYMBOL / DECIMATION, i_key.clone());
let (elg_q, q) = EarlyLateGate::new(q, elg_filter.clone(), SAMPLE_PER_SYMBOL / DECIMATION, q_key.clone());
let (eye_i_tx, eye_i_rx) = std::sync::mpsc::channel::<Vec<f32>>();
let (eye_q_tx, eye_q_rx) = std::sync::mpsc::channel::<Vec<f32>>();
let (merger, iq) = Merger::new([i, q]);
let (constellation_tx, constellation_rx) = std::sync::mpsc::channel();
let (debug, iq) = ScanTagged::new(
iq,
(
HistoryBuf::new(0., (SAMPLE_PER_SYMBOL * 2) / DECIMATION),
HistoryBuf::new(0., (SAMPLE_PER_SYMBOL * 2) / DECIMATION),
),
move |(buf_i, buf_q), input| {
let ([re, im], tag) = input.into();
buf_i.push(re);
buf_q.push(im);
if tag.is_some_and(|t| t.retrieve(&i_key).is_some())
{
let _ = constellation_tx.send(Complex::new(re, im));
let _ = eye_i_tx.send(buf_i.as_slice().iter().copied().collect::<Vec<_>>());
let _ = eye_q_tx.send(buf_q.as_slice().iter().copied().collect::<Vec<_>>());
}
//(Complex::new(re, im), None).into()
let k: Tagged<()> = ((), None).into();
k
},
);
let tx_sink = NullSink::new(iq);
let host = cpal::default_host();
let device = host
.default_input_device()
.expect("no output device available");
let mut supported_configs_range = device
.supported_input_configs()
.expect("error while querying configs");
let supported_config = supported_configs_range
.next()
.expect("no supported config?!")
.with_sample_rate(SAMPLE_RATE as u32);
let _stream = device
.build_input_stream(
&supported_config.into(),
move |data: &[f32], _: &cpal::InputCallbackInfo| {
for x in data
{
let _ = audio_tx.send(*x * 100.);
}
},
move |_err| {},
None, // None=blocking, Some(Duration)=timeout
)
.unwrap();
let graph = flowgraph![
rx_source,
downconverter,
agc,
matched_filter,
splitter,
proj_i,
proj_q,
elg_i,
elg_q,
merger,
debug,
tx_sink
];
graph.run(6);
let mut eye_i_history = HistoryBuf::new(vec![], 200);
let mut eye_q_history = HistoryBuf::new(vec![], 200);
let mut constellation = HistoryBuf::new(Complex::new(0., 0.), 500);
let mut agc_error = HistoryBuf::new(0., 50_000);
eframe::run_simple_native("Window", Default::default(), move |ctx, _frame| {
for eye in eye_i_rx.try_iter().take(200)
{
eye_i_history.push(eye);
}
for eye in eye_q_rx.try_iter().take(200)
{
eye_q_history.push(eye);
}
for point in constellation_rx.try_iter().take(500)
{
constellation.push(point);
}
for x in agc_error_rx.try_iter().take(5000)
{
agc_error.push(x);
}
egui::CentralPanel::default().show(ctx, |ui| {
egui_plot::Plot::new("plot")
.data_aspect(1.)
.show(ui, |plot_ui| {
plot_ui.points(
Points::new(
"Constellation",
constellation
.as_slice()
.iter()
.map(|point| [point.re as f64, point.im as f64])
.collect::<PlotPoints>(),
)
.color(Color32::YELLOW.gamma_multiply_u8(70)),
);
plot_ui.line(
Line::new(
"AGC Error",
agc_error
.as_slice()
.iter()
.enumerate()
.map(|(i, point)| [map(i as f64, 0., 50_000., -2., 2.), *point as f64 + 2.])
.collect::<PlotPoints>(),
)
.color(Color32::YELLOW.gamma_multiply_u8(70)),
);
for (eye_i, eye_q) in eye_i_history
.as_slice()
.iter()
.zip(eye_q_history.as_slice().iter())
{
plot_ui.line(
Line::new(
"In-Phase",
eye_i
.iter()
.enumerate()
.map(|(i, x)| {
[i as f64 / ((SAMPLE_PER_SYMBOL as f64 * 2.) / DECIMATION as f64) + 1., *x as f64]
})
.collect::<Vec<_>>(),
)
.color(Color32::RED),
);
plot_ui.line(
Line::new(
"Quadrature",
eye_q
.iter()
.enumerate()
.map(|(i, x)| {
[*x as f64, i as f64 / ((SAMPLE_PER_SYMBOL as f64 * 2.) / DECIMATION as f64) - 2.]
})
.collect::<Vec<_>>(),
)
.color(Color32::GREEN),
);
// plot_ui.points(
// Points::new(
// "Constellation",
// eye_i
// .iter()
// .zip(eye_q.iter())
// .skip(SAMPLE_PER_SYMBOL / DECIMATION)
// .step_by(SAMPLE_PER_SYMBOL / DECIMATION)
// .map(|(i, q)| [*i as f64, *q as f64])
// .collect::<PlotPoints>(),
// )
// .color(Color32::GREEN)
// .radius(1.5),
// );
}
});
});
ctx.request_repaint();
})
.unwrap();
}
pub fn to_bits(n: u8) -> [bool; 8]
{
[
(n & 1) == 1,
(n >> 1) & 1 == 1,
(n >> 2) & 1 == 1,
(n >> 3) & 1 == 1,
(n >> 4) & 1 == 1,
(n >> 5) & 1 == 1,
(n >> 6) & 1 == 1,
(n >> 7) & 1 == 1,
]
}
pub fn from_bits(n: [bool; 8]) -> u8
{
(n[0] as u8)
| ((n[1] as u8) << 1)
| ((n[2] as u8) << 2)
| ((n[3] as u8) << 3)
| ((n[4] as u8) << 4)
| ((n[5] as u8) << 5)
| ((n[6] as u8) << 6)
| ((n[7] as u8) << 7)
}

View File

@ -0,0 +1,8 @@
[package]
name = "simple"
version = "0.1.0"
edition = "2024"
[dependencies]
oxydsp-flowgraph = {path = "../../oxydsp-flowgraph/"}
oxydsp-dsp = {path = "../../oxydsp-dsp/"}

View File

@ -0,0 +1,31 @@
use std::time::Duration;
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
use oxydsp_dsp::blocks::utilities::adapters::Scan;
use oxydsp_dsp::blocks::utilities::channels::RxSource;
use oxydsp_flowgraph::flowgraph;
fn main()
{
let (tx, rx) = std::sync::mpsc::channel();
let (rx_source, numbers) = RxSource::new(rx);
let (inspect, numbers) = Scan::new(numbers, 0, |state, x: usize| {
if x.is_multiple_of(100)
{
println!("{}", x);
}
x
});
let null_sink = NullSink::new(numbers);
let graph = flowgraph![rx_source, inspect, null_sink];
std::thread::spawn(move || {
let mut x = 0usize;
loop
{
let _ = tx.send(x);
x += 1;
}
});
graph.run(1).join();
}

View File

@ -7,3 +7,4 @@ edition = "2024"
num = "0.4.3"
oxydsp-flowgraph = {path = "../oxydsp-flowgraph/"}
rustfft = "6.4.1"
wide = "1.2.0"

View File

@ -1 +1,2 @@
pub mod fir;
pub mod pulse_shaping;

View File

@ -1,20 +1,22 @@
use num::Zero;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
use std::iter::Sum;
use std::ops::Add;
use std::ops::Mul;
use crate::filtering::fir::Fir;
#[derive(BlockIO)]
#[sync_block]
pub struct FirFilter<F, T, O>
pub struct FirFilter<F, T, O, const D: usize = 1>
where
T: Clone + 'static,
T: Clone + Zero + 'static,
F: Mul<T, Output = O> + Clone + 'static,
O: Sum + 'static,
O: Add<O, Output = O> + Sum + Clone + Zero + 'static,
{
#[input]
input: In<T>,
@ -25,14 +27,27 @@ where
filter: crate::filtering::fir::FirFilter<F, T, O>,
}
impl<F, T, O> FirFilter<F, T, O>
impl<F, T, O> FirFilter<F, T, O, 1>
where
T: Clone + 'static,
T: Clone + Zero + 'static,
F: Mul<T, Output = O> + Clone + 'static,
O: Sum + 'static,
O: Add<O, Output = O> + Sum + Clone + Zero,
{
pub fn new(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
{
Self::new_decimating(input, impulse_response)
}
}
impl<F, T, O, const D: usize> FirFilter<F, T, O, D>
where
T: Clone + Zero + 'static,
F: Mul<T, Output = O> + Clone + 'static,
O: Add<O, Output = O> + Sum + Clone + Zero,
{
pub fn new_decimating(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
{
const { assert!(D != 0); };
let (output, filtered) = oxydsp_flowgraph::io::stream();
(
Self {
@ -45,14 +60,35 @@ where
}
}
impl<'view, F, T, O> SyncBlock<'view> for FirFilter<F, T, O>
impl<F, T, O, const D: usize> Block for FirFilter<F, T, O, D>
where
T: Clone + 'view,
T: Clone + Zero,
F: Mul<T, Output = O> + Clone + 'static,
O: Sum + 'static,
O: Add<O, Output = O> + Sum + Clone + Zero,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
Some(state.filter.next(input))
if D > 1
{
let mut input_iter = self.input.iter();
let mut output_pusher = self.output.write_push();
while input_iter.len() >= D && output_pusher.len() > 0
{
// TODO: Maybe find a better way to do this.
// I hope this is at least well optimized by the compilator
let batch: [_; D] = std::array::from_fn(|_| input_iter.next().unwrap());
let tag_batch: [_; D] = std::array::from_fn(|i| &batch[i].1);
let data_batch: [_; D] = std::array::from_fn(|i| batch[i].0.clone());
self.filter.insert_batch_ref(&data_batch);
let _ = output_pusher
.push((self.filter.filtered(), Tag::from_tag_opts(tag_batch.into_iter())).into());
}
}
else if D == 1
{
self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into()));
}
BlockResult::Ok
}
}

View File

@ -0,0 +1,77 @@
use std::iter::Sum;
use std::ops::Add;
use num::Zero;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use crate::filtering::fir::Fir;
use crate::filtering::fir::FirFilter;
#[derive(BlockIO)]
pub struct PulseShaper<T: 'static + std::ops::Mul<Output = T> + std::iter::Sum + Add<T, Output = T> + Sum + Clone + Zero>
{
#[input]
input: In<T>,
#[output]
output: Out<T>,
symbol_length: usize,
remaining: usize,
pulse_shaper: FirFilter<T, T, T>,
}
impl<T: 'static + std::ops::Mul<Output = T> + std::iter::Sum + std::clone::Clone + Zero> PulseShaper<T>
{
pub fn new(input: In<T>, pulse_shape: Fir<T>, symbol_length: usize) -> (Self, In<T>)
{
let (output, pulse_shaped) = oxydsp_flowgraph::io::stream();
(
Self {
input,
output,
symbol_length,
remaining: 0,
pulse_shaper: FirFilter::new(pulse_shape),
},
pulse_shaped,
)
}
}
impl<T: 'static + std::ops::Mul<Output = T> + std::iter::Sum + std::clone::Clone + Zero> Block
for PulseShaper<T>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let mut reader = self.input.iter();
let mut writer = self.output.write_push();
for _ in 0..writer.len()
{
if self.remaining == 0
{
if let Some(input) = reader.next()
{
let (data, tag) = input.into();
let _ = writer.push((self.pulse_shaper.next(data), tag).into());
self.remaining = self.symbol_length - 1;
}
else
{
return oxydsp_flowgraph::block::BlockResult::Ok;
}
}
else
{
let _ = writer.push(self.pulse_shaper.next(T::zero()).into());
self.remaining -= 1;
}
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}

View File

@ -1,19 +1,18 @@
use num::{Complex, Float};
use oxydsp_flowgraph::{
BlockIO,
block::SyncBlock,
io::{In, Out},
sync_block,
};
use std::fmt::Debug;
use num::Complex;
use num::Float;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use rustfft::FftNum;
use crate::{
filtering::fir::{Fir, FirFilter},
synthesis::oscillator::Nco,
};
use crate::filtering::fir::Fir;
use crate::filtering::fir::FirFilter;
use crate::synthesis::oscillator::Nco;
#[derive(BlockIO)]
#[sync_block]
pub struct ZeroIf<T: std::clone::Clone + num::Num + Float + From<f32> + 'static>
{
#[input]
@ -28,7 +27,13 @@ pub struct ZeroIf<T: std::clone::Clone + num::Num + Float + From<f32> + 'static>
impl<T> ZeroIf<T>
where
T: std::clone::Clone + num::Num + FftNum + From<f32> + 'static + num::Float,
T: std::clone::Clone
+ num::Num
+ FftNum
+ From<f32>
+ 'static
+ num::Float
+ num::traits::FloatConst,
{
pub fn new(input: In<T>, lo: Nco<T>) -> (Self, In<Complex<T>>)
{
@ -38,7 +43,7 @@ where
input,
output,
local_oscillator: lo,
filter: FirFilter::new(Fir::lowpass(lo.frequency(), 100)),
filter: FirFilter::new(Fir::lowpass(lo.frequency(), 100).normalized_len()),
},
port,
)
@ -50,16 +55,20 @@ where
}
}
impl<'view, T> SyncBlock<'view> for ZeroIf<T>
impl<T> Block for ZeroIf<T>
where
T: std::clone::Clone + num::Num + Float + From<f32> + 'static + num::Float,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
// Mix
let lo_sample = state.local_oscillator.next().unwrap();
let iq = Complex::new(input * lo_sample.re, input * lo_sample.im);
self.output.push_iter(self.input.iter().map(|input| {
let (data, tag) = input.into();
// Mix
let lo_sample = self.local_oscillator.next().unwrap();
let iq = Complex::new(data * lo_sample.re, data * lo_sample.im);
Some(state.filter.next(iq))
(self.filter.next(iq), tag).into()
}));
oxydsp_flowgraph::block::BlockResult::Ok
}
}

View File

@ -4,15 +4,11 @@ use std::ops::Mul;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::TagMergable;
use oxydsp_flowgraph::tag::Tag;
#[derive(BlockIO)]
#[sync_block]
pub struct Adder<Ia, Ib, O>
where
Ia: Add<Ib, Output = O> + 'static,
@ -49,35 +45,26 @@ where
}
}
impl<'view, Ia, Ib, O> SyncBlock<'view> for Adder<Ia, Ib, O>
impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
where
Ia: Add<Ib, Output = O> + 'static,
Ib: 'static,
O: 'static,
{
fn sync_work(_state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> BlockResult
{
Some(input.0 + input.1)
self.output.push_iter(
self.input_a
.iter()
.zip(self.input_b.iter())
.map(|(x, y)| {
(x.0 + y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into()
})
);
BlockResult::Ok
}
}
// impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
// where
// Ia: Add<Ib, Output = O> + 'static,
// Ib: 'static,
// O: 'static,
// {
// fn work(&mut self) -> BlockResult
// {
// self.output.push_iter(
// (&mut self.input_a, &mut self.input_b)
// .pop_iter()
// .map(|(a, b)| (a.0 + b.0, a.1.merge(&b.1))),
// );
// BlockResult::Ok
// }
// }
#[derive(BlockIO)]
pub struct Multiplier<Ia, Ib, O>
where
@ -124,9 +111,12 @@ where
fn work(&mut self) -> BlockResult
{
self.output.push_iter(
(&mut self.input_a, &mut self.input_b)
.pop_iter()
.map(|(a, b)| (a.0 * b.0, a.1.merge(&b.1)).into()),
self.input_a
.iter()
.zip(self.input_b.iter())
.map(|(x, y)| {
(x.0 * y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into()
})
);
BlockResult::Ok
}

View File

@ -6,7 +6,6 @@ use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
#[derive(BlockIO)]
@ -81,7 +80,7 @@ impl<T: Float + From<f32> + 'static> Block for Nco<T>
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output
.push_iter(&mut self.frequency.pop_iter().map(|f| {
.push_iter(&mut self.frequency.iter().map(|f| {
self.nco.set_frequency(f.0);
(self.nco.next().unwrap(), f.1).into()
}));

View File

@ -1,21 +1,19 @@
use std::collections::VecDeque;
use std::iter::Sum;
use num::Float;
use num::NumCast;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
use oxydsp_flowgraph::tag::TagKey;
use crate::filtering::fir::Fir;
use crate::filtering::fir::FirFilter;
use crate::filtering::history_buf::HistoryBuf;
#[derive(BlockIO)]
#[sync_block(tagged)]
pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'static>
{
#[input]
@ -27,7 +25,7 @@ pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'stati
symbol_length: usize,
// Window looking at symbol_length samples at a time
window: VecDeque<T>,
window: HistoryBuf<T>,
// The current location of the window, in relation to the last sample
window_location: usize,
@ -44,7 +42,7 @@ pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'stati
impl<T> EarlyLateGate<T>
where
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
T: Float + Sum + Clone + 'static + Send + Sync + NumCast + Default,
{
pub fn new(
input: In<T>,
@ -58,7 +56,7 @@ where
Self {
input,
output,
window: VecDeque::with_capacity(symbol_length),
window: HistoryBuf::new(Default::default(), symbol_length),
symbol_length,
window_location: 0,
window_center: symbol_length / 2,
@ -72,48 +70,44 @@ where
}
}
impl<'view, T> SyncBlock<'view> for EarlyLateGate<T>
impl<'view, T> Block for EarlyLateGate<T>
where
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
if state.window.len() < *state.symbol_length
{
state.window.push_back(input.0);
*state.window_location += 1;
return Some(input.0.into());
}
self.output.push_iter(self.input.iter().map(|input| {
// Bring new sample in
self.window.push(input.0);
self.window_location += 1;
// Bring new sample in
state.window.pop_front();
state.window.push_back(input.0);
*state.window_location += 1;
let sample = self.window.as_slice()[self.window_center];
let mut tag = None;
if self.window_location >= self.next_sample as usize
{
let early_index = self.window_center - (0.25 * self.symbol_length as f32) as usize;
let late_index = self.window_center + (0.25 * self.symbol_length as f32) as usize;
let sample = state.window[*state.window_center];
let mut tag = None;
if *state.window_location >= *state.next_sample as usize
{
let early_index = *state.window_center - (0.25 * *state.symbol_length as f32) as usize;
let late_index = *state.window_center + (0.25 * *state.symbol_length as f32) as usize;
let early_sample = self.window.as_slice()[early_index];
let late_sample = self.window.as_slice()[late_index];
let early_sample = state.window[early_index];
let late_sample = state.window[late_index];
let error = (late_sample - early_sample) * sample;
let correction = self.loop_filter.next(error);
let error = (late_sample - early_sample) * sample;
let correction = state.loop_filter.next(error);
// Figure out next sample location
self.next_sample +=
(self.symbol_length as f32 + correction.to_f32().unwrap()).max(0.);
// Figure out next sample location
*state.next_sample +=
(*state.symbol_length as f32 + correction.to_f32().unwrap()).max(0.);
// Turn everything back relative to current sample
self.next_sample -= self.window_location as f32;
self.window_location = 0;
// Turn everything back relative to current sample
*state.next_sample -= *state.window_location as f32;
*state.window_location = 0;
tag = Some(Tag::with_entry(self.symbol_tag_key.clone(), error));
}
tag = Some(Tag::with_entry(state.symbol_tag_key.clone(), error));
}
(sample, tag).into()
}));
Some((sample, tag).into())
oxydsp_flowgraph::block::BlockResult::Ok
}
}

View File

@ -2,3 +2,4 @@ pub mod adapters;
pub mod channels;
pub mod iter;
pub mod squelch;
pub mod graph_control;

View File

@ -1,14 +1,12 @@
use std::iter::FusedIterator;
use std::mem::MaybeUninit;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
use oxydsp_flowgraph::tag::TagMergable;
use oxydsp_flowgraph::tag::Tagged;
@ -27,7 +25,7 @@ pub struct Map<I: 'static, O: 'static, F>
impl<I: 'static, O: 'static, F> Map<I, O, F>
where
F: Fn(I) -> O,
F: FnMut(I) -> O,
{
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
{
@ -38,15 +36,12 @@ where
impl<I: 'static, O: 'static, F> Block for Map<I, O, F>
where
F: Fn(I) -> O,
F: FnMut(I) -> O,
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(
self.input
.pop_iter()
.map(|x| ((&self.map)(x.0), x.1).into()),
);
self.output
.push_iter(self.input.iter().map(|x| ((&mut self.map)(x.0), x.1).into()));
BlockResult::Ok
}
}
@ -80,12 +75,12 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
for _ in 0..(writer.len().min(reader.len()))
{
let (input, tag_opt) = reader.pop().unwrap().into();
let (input, tag_opt) = reader.next().unwrap().into();
let (output, result) = (self.map)(input);
let _ = writer.push((output, tag_opt).into());
match result
@ -131,12 +126,12 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
for _ in 0..(writer.len().min(reader.len()))
{
let (input, tag_opt) = reader.pop().unwrap().into();
let (input, tag_opt) = reader.next().unwrap().into();
let (tagged_out, result) = (self.map)((input, tag_opt.clone()).into());
let (output, tag_out) = tagged_out.into();
@ -156,10 +151,9 @@ where
}
#[derive(BlockIO)]
#[sync_block]
pub struct Scan<I: 'static, O: 'static, S, F>
where
F: Fn(&mut S, I) -> O,
F: FnMut(&mut S, I) -> O,
{
#[input]
input: In<I>,
@ -174,7 +168,7 @@ where
impl<I: 'static, O: 'static, S, F> Scan<I, O, S, F>
where
F: Fn(&mut S, I) -> O,
F: FnMut(&mut S, I) -> O,
{
pub fn new(input: In<I>, initial_state: S, map: F) -> (Self, In<O>)
{
@ -191,21 +185,24 @@ where
}
}
impl<'view, I, O, S, F> SyncBlock<'view> for Scan<I, O, S, F>
impl<I, O, S, F> Block for Scan<I, O, S, F>
where
I: 'static,
O: 'static,
S: 'view,
F: Fn(&mut S, I) -> O + 'view,
F: FnMut(&mut S, I) -> O,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> BlockResult
{
Some((*state.map)(state.state, input))
self.output.push_iter(
self.input
.iter()
.map(|x| ((self.map)(&mut self.state, x.0), x.1).into()),
);
BlockResult::Ok
}
}
#[derive(BlockIO)]
#[sync_block(tagged)]
pub struct ScanTagged<I: 'static, O: 'static, S, F>
where
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
@ -240,16 +237,20 @@ where
}
}
impl<'view, I, O, S, F> SyncBlock<'view> for ScanTagged<I, O, S, F>
impl<I, O, S, F> Block for ScanTagged<I, O, S, F>
where
I: 'static,
O: 'static,
S: 'view,
F: Fn(&mut S, Tagged<I>) -> Tagged<O> + 'view,
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> BlockResult
{
Some((*state.map)(state.state, input))
self.output.push_iter(self.input.iter().map(|tagged| {
let cloned_tag = tagged.1.clone();
let (output, tag) = (self.map)(&mut self.state, (tagged.0, cloned_tag).into()).into();
(output, Tag::from_tag_opts([&tagged.1, &tag].into_iter())).into()
}));
BlockResult::Ok
}
}
@ -289,15 +290,15 @@ impl<T: Clone + 'static> Block for Repeat<T>
{
fn work(&mut self) -> BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
let len = writer.len().min(reader.len() / self.repetitions + 1);
for _ in 0..len
{
if self.remaining == 0 || self.current.is_none()
{
if let Some(x) = reader.pop()
if let Some(x) = reader.next()
{
self.current = Some(x.into());
self.remaining = self.repetitions;
@ -325,7 +326,6 @@ impl<T: Clone + 'static> Block for Repeat<T>
}
#[derive(BlockIO)]
#[sync_block]
pub struct NullSink<T: 'static>
{
#[input]
@ -340,12 +340,12 @@ impl<T: 'static> NullSink<T>
}
}
impl<'view, I: 'static> SyncBlock<'view> for NullSink<I>
impl<I: 'static> Block for NullSink<I>
{
fn sync_work(_: Self::StateView, _: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> BlockResult
{
// Don't do shit !
Some(())
self.input.iter().for_each(|_| ());
BlockResult::Ok
}
}
@ -384,12 +384,9 @@ impl<T: 'static + Clone> Block for Tee<T>
{
fn work(&mut self) -> BlockResult
{
let writer_a = self.output_a.write();
let writer_b = self.output_b.write();
for x in self
.input
.pop_iter()
.take(writer_a.len().min(writer_b.len()))
let mut writer_a = self.output_a.write_push();
let mut writer_b = self.output_b.write_push();
for x in self.input.iter().take(writer_a.len().min(writer_b.len()))
{
let _ = writer_a.push(x.clone());
let _ = writer_b.push(x);
@ -447,8 +444,8 @@ where
{
fn work(&mut self) -> BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
let max_write = writer.len();
let mut written = 0;
@ -473,7 +470,7 @@ where
if self.current_iter.is_none()
{
// Get input
if let Some(input) = reader.pop()
if let Some(input) = reader.next()
{
let mut new_iter = (self.map)(input.0).into_iter();
if let Some(first_elt) = new_iter.next()
@ -500,3 +497,122 @@ where
BlockResult::Ok
}
}
#[derive(BlockIO)]
pub struct Splitter<T: 'static + Clone, const N: usize>
{
#[input]
input: In<T>,
#[output]
outputs: [Out<T>; N],
}
impl<T: 'static + Clone, const N: usize> Splitter<T, N>
{
pub fn new(input: In<T>) -> (Self, [In<T>; N])
{
const { assert!(N > 0) }
let (outs, ins) = oxydsp_flowgraph::io::streams();
(
Self {
input,
outputs: outs,
},
ins,
)
}
}
impl<T: 'static + Clone, const N: usize> Block for Splitter<T, N>
{
fn work(&mut self) -> BlockResult
{
let mut input_iter = self.input.iter();
let mut outputs = self
.outputs
.iter_mut()
.map(|x| x.write_push())
.collect::<Vec<_>>();
let length = input_iter
.len()
.min(outputs.iter().map(|x| x.len()).min().unwrap());
for _ in 0..length
{
let pulled = input_iter.next().unwrap();
outputs.iter_mut().for_each(|x| {
let _ = x.push(pulled.clone());
});
}
BlockResult::Ok
}
}
#[derive(BlockIO)]
pub struct Merger<T: 'static, const N: usize>
{
#[input]
inputs: [In<T>; N],
#[output]
output: Out<[T; N]>,
}
impl<T: 'static, const N: usize> Merger<T, N>
{
pub fn new(inputs: [In<T>; N]) -> (Self, In<[T; N]>)
{
const { assert!(N > 0) }
let (out, inp) = oxydsp_flowgraph::io::stream();
(
Self {
inputs,
output: out,
},
inp,
)
}
}
impl<T: Sized + 'static, const N: usize> Block for Merger<T, N>
{
fn work(&mut self) -> BlockResult
{
let mut inputs = self.inputs.iter_mut().map(|x| x.iter()).collect::<Vec<_>>();
let mut output = self.output.write_push();
let len = inputs
.len()
.min(inputs.iter().map(|x| x.len()).min().unwrap());
let mut datas: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit());
let mut tags: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit());
for _ in 0..len
{
for (i, (data, tag)) in inputs
.iter_mut()
.map(|x| x.next().unwrap().into())
.enumerate()
{
datas[i] = MaybeUninit::new(data);
tags[i] = MaybeUninit::new(tag);
}
let ok_datas: [_; N] = unsafe {
std::array::from_fn(|i| std::mem::replace(&mut datas[i], MaybeUninit::uninit()).assume_init())
};
let ok_tags = unsafe {
std::mem::transmute::<&[MaybeUninit<Option<Tag>>; N], &[Option<Tag>; N]>(&tags)
};
let tag = Tag::from_tag_opts(ok_tags.into_iter());
output.push((ok_datas, tag).into());
}
BlockResult::Ok
}
}

View File

@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
@ -7,7 +8,6 @@ use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
#[derive(BlockIO)]
@ -45,20 +45,13 @@ impl<Tx, I: 'static> TxSink<Tx, I>
}
}
impl<I: 'static> Block for RxSource<Receiver<I>, I>
impl<I: 'static + Debug> Block for RxSource<Receiver<I>, I>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
if self
.output
.push_iter(self.input.try_iter().map(|x| (x, None).into()))
{
BlockResult::Ok
}
else
{
BlockResult::Terminated
}
self.output
.push_iter(self.input.try_iter().map(|x| (x, None).into()));
BlockResult::Ok
}
}
@ -68,7 +61,7 @@ impl<I: 'static> Block for TxSink<Sender<I>, I>
{
if self
.input
.pop_iter()
.iter()
.map(|x| self.output.send(x.0))
.any(|res| res.is_err())
{
@ -87,7 +80,7 @@ impl<I: 'static> Block for TxSink<SyncSender<I>, I>
{
if self
.input
.pop_iter()
.iter()
.map(|x| self.output.send(x.0))
.any(|res| res.is_err())
{

View File

@ -0,0 +1,53 @@
use oxydsp_flowgraph::{BlockIO, block::Block, io::{In, Out}, tag::TagKey};
#[derive(BlockIO)]
pub struct GraphKiller<T: 'static, K: Send>
{
#[input]
input: In<T>,
#[output]
output: Out<T>,
kill_tag: TagKey<K>
}
impl<T: 'static, K: Send + Send + 'static> GraphKiller<T, K>
{
pub fn new(input: In<T>, kill_on: TagKey<K>) -> (Self, In<T>)
{
let (output, port) = oxydsp_flowgraph::io::stream();
(
GraphKiller
{
input,
output,
kill_tag: kill_on,
},
port
)
}
}
impl<T: 'static, K: Send + Sync + 'static> Block for GraphKiller<T, K>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let mut reader = self.input.iter();
let mut writer = self.output.write_push();
for _ in 0..reader.len().min(writer.len())
{
let (data, tag) = reader.next().unwrap().into();
if tag.as_ref().is_some_and(|t| t.retrieve(&self.kill_tag).is_some())
{
return oxydsp_flowgraph::block::BlockResult::Exit;
}
let _ = writer.push((data, tag).into());
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}

View File

@ -52,7 +52,7 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let mut writer = self.output.write_push();
for _ in 0..writer.len()
{

View File

@ -1,13 +1,16 @@
use std::{collections::VecDeque, iter::Sum};
use std::collections::VecDeque;
use std::iter::Sum;
use num::{Float, FromPrimitive, One, Zero, complex::ComplexFloat};
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
io::{In, Out, PopIterable},
};
use crate::filtering::fir::{Fir, FirFilter};
use num::Float;
use num::FromPrimitive;
use num::One;
use num::Zero;
use num::complex::ComplexFloat;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
#[derive(BlockIO)]
pub struct Squelch<T>
@ -57,8 +60,8 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
for x in self.input.pop_iter().take(writer.len())
let mut writer = self.output.write_push();
for x in self.input.iter().take(writer.len())
{
let (element, tag) = x.into();

View File

@ -1 +1,2 @@
pub mod fir;
pub mod history_buf;

View File

@ -1,29 +1,75 @@
use std::collections::VecDeque;
use std::f64::consts::PI;
use std::iter::Sum;
use std::ops::Div;
use std::ops::Mul;
use std::process::Output;
use num::Complex;
use num::Float;
use num::FromPrimitive;
use num::Num;
use num::One;
use num::Zero;
use num::complex::ComplexFloat;
use num::zero;
use rustfft::FftNum;
use rustfft::FftPlanner;
use std::f64::consts::PI;
use std::iter::Sum;
use std::ops::Add;
use std::ops::Div;
use std::ops::Mul;
use crate::filtering::history_buf::HistoryBuf;
use crate::map;
use crate::units::DigitalFrequency;
/// Finite impulse response
/// Represents a finite impulse response as a vector
/// of values in time.
///
/// Convention
/// indices : 0 ----------------- fir.0.len - 1
/// time : ---------------->
///
/// For a reverb ir for example the clap would be at index 0
/// and the reverb tail towards the end of the vector.
#[derive(Clone)]
pub struct Fir<T>(pub Vec<T>);
impl<T> Fir<T>
where
T: Num + Clone + Sum,
{
pub fn convoluted_with(&self, other: &Fir<T>) -> Fir<T>
{
// Perform convolution
let mut new_fir = vec![];
let mut filter = FirFilter::<T, T, T>::new(self.clone());
for x in self.0.iter().rev()
{
new_fir.push(filter.next(x.clone()));
}
for _ in 0..other.0.len()
{
new_fir.push(filter.next(T::zero()));
}
Fir(new_fir)
}
}
impl<T> Fir<Complex<T>>
where
T: FftNum + Float + Clone,
{
/// Synthesizes an impulse response from a transfer function using an inverse fourrier
/// transform.
///
/// The input units are thus :
/// Transfer function :
/// start center end
/// [ ]
/// frequency :
/// 0 pi 2*pi
/// = pi, -pi 0
/// nyquist's frequency
/// (As the frequencies are periodic)
///
pub fn from_transfer_function(tf: impl AsRef<[Complex<T>]>) -> Fir<Complex<T>>
{
let mut planner = FftPlanner::new();
@ -43,6 +89,12 @@ where
Fir(shifted_fir)
}
/// Creates a low pass filter with the following ideal transfer function using the ifft method:
///
/// ________________ ________________
/// |____________________|
/// 0 cuttoff -cuttoff 0
///
pub fn lowpass(cutoff: DigitalFrequency, length: usize) -> Fir<Complex<T>>
{
let mut tf = vec![Complex::<T>::zero(); length];
@ -63,6 +115,8 @@ where
T: ComplexFloat + Div<T::Real, Output = T> + Copy + Sum,
T::Real: Float,
{
/// Returns the same impulse response
/// normalized by the length of the sum of the vectors.
pub fn normalized(mut self) -> Self
{
let sum: T = self.0.iter().copied().sum();
@ -73,43 +127,218 @@ where
}
}
impl<T> Fir<T>
where
T: ComplexFloat + Div<T::Real, Output = T>,
T::Real: Float + FromPrimitive,
{
/// Returns the same impulse response
/// normalized by the length of the impulse response.
pub fn normalized_len(mut self) -> Self
{
let len = T::Real::from_usize(self.0.len()).unwrap();
self.0.iter_mut().for_each(|x| *x = *x / len);
self
}
}
impl<T> Fir<T>
where
T: ComplexFloat + Div<T::Real, Output = T> + Copy + Add<T, Output = T>,
T::Real: Float,
{
/// Returns the same impulse response
/// normalized by the energy or the sum of the squares of the magnitues
/// of the impulse response
pub fn normalized_sqr(mut self) -> Self
{
let sum = self
.0
.iter()
.copied()
.map(|x| x.abs() * x.abs())
.reduce(|x, y| x + y)
.unwrap();
self.0.iter_mut().for_each(|x| *x = *x / sum);
self
}
}
impl<T> Fir<T>
where
T: ComplexFloat + FromPrimitive,
{
/// Creates a square unit impulse response :
/// a vector of length `length` filled with ones
pub fn square(length: usize) -> Self
{
Self((0..length).map(|_| T::one()).collect())
}
/// Creates a simple proportional integral (PI) loop impulse response :
///
/// FIR:
/// ```text
/// _ ................................... Kp (`proportional_gain`)
/// |
/// |
/// |
///  |____________________________ ...... Ki (`integral_gain`)
///    |
///
/// 0 ------------------------- `length`
/// ```
///
pub fn proportional_integral(length: usize, proportional_gain: T, integral_gain: T) -> Self
{
Self(
(0..length)
.map(|i| {
if i == 0
{
proportional_gain + integral_gain
}
else
{
integral_gain
}
})
.collect(),
)
}
/// Creates a root raised cosine (RRC) FIR of length `length`
/// with the given roll off factor.
///
/// The corresponding RC (convolution of this filter with itself)
/// has its zero crossing every `symbol_length` samples (except at 0).
pub fn root_raised_cosine(length: usize, roll_off: f64, symbol_length: usize) -> Self
{
Self(
(0..length)
.map(|i| {
let t = map(
i as f64,
0.,
length as f64,
-(length as f64) * 0.5,
length as f64 * 0.5,
) / symbol_length as f64;
T::from_f64(root_raised_cosine(t, roll_off, 1.)).unwrap()
})
.collect(),
)
}
/// Creates a gaussion fir of length `length`
/// The maximum amplitude of the fir is 1.
/// The gaussian curve is centered in the fir.
/// The parameter `standard_deviations` specifies how many multiples of the
/// standard deviations is on the left and right of the center before the fir cuts it off.
pub fn gaussian(length: usize, standard_deviations: f32) -> Self
{
Self(
(0..length)
.map(|x| {
let t = map(
x as f64,
0.,
length as f64,
-standard_deviations as f64,
standard_deviations as f64,
);
// Gaussian with sd=1
let sq = t / 2.;
T::from_f64(-sq * sq).unwrap().exp()
})
.collect(),
)
}
}
/// A simple convolutional finite impulse response filter
pub struct FirFilter<F, T, O>
where
F: Mul<T, Output = O>,
O: Sum,
O: Add<O, Output = O> + Sum + Clone + Zero,
{
fir: Vec<F>,
taps: VecDeque<T>,
//taps: VecDeque<T>,
taps: HistoryBuf<T>,
}
impl<F, T, O> FirFilter<F, T, O>
where
T: Clone,
T: Clone + Zero,
F: Mul<T, Output = O> + Clone,
O: Sum,
O: Add<O, Output = O> + Sum + Clone + Zero,
{
/// Creates a filter with the given impulse response
pub fn new(impulse_response: Fir<F>) -> Self
{
let len = impulse_response.0.len();
Self {
fir: impulse_response.0,
taps: VecDeque::with_capacity(len),
taps: HistoryBuf::new(T::zero(), len),
}
}
/// Inserts a new sample in the filter and get its new value
///
/// At the beginning, the delay line starts with zeroes.
pub fn next(&mut self, input: T) -> O
{
if self.taps.len() == self.fir.len()
{
self.taps.pop_front();
}
self.taps.push_back(input);
self.insert(input);
self.filtered()
}
self.fir
.iter()
.zip(self.taps.iter())
.map(|(a, b)| a.clone() * b.clone())
.sum()
pub fn insert(&mut self, input: T)
{
self.taps.push(input);
}
pub fn insert_batch_ref(&mut self, input: &[T])
{
for x in input.iter().cloned()
{
self.taps.push(x);
}
}
/// Gets the current value of the filter
/// given the sampels that it currently holds
pub fn filtered(&self) -> O
{
let taps = self.taps.as_slice();
Self::dot_prod(&self.fir, taps)
}
pub fn dot_prod(a: &[F], b: &[T]) -> O
{
assert_eq!(a.len(), b.len());
let mut sum: [_; 4] = [O::zero(), O::zero(), O::zero(), O::zero()];
let (a_chunks, a_remainder) = a.as_chunks::<4>();
let (b_chunks, b_remainder) = b.as_chunks::<4>();
for (x, y) in a_chunks.iter().zip(b_chunks.iter())
{
sum[0] = sum[0].clone() + x[0].clone() * y[0].clone();
sum[1] = sum[1].clone() + x[1].clone() * y[1].clone();
sum[2] = sum[2].clone() + x[2].clone() * y[2].clone();
sum[3] = sum[3].clone() + x[3].clone() * y[3].clone();
}
let mut sum = sum[0].clone() + sum[1].clone() + sum[2].clone() + sum[3].clone();
for (x, y) in a_remainder.iter().zip(b_remainder.iter())
{
sum = sum + x.clone() * y.clone();
}
sum
}
}
@ -118,3 +347,31 @@ pub fn estimate_fir_length(transition_width: f64, sample_rate: f64) -> f64
{
3.8 * sample_rate / transition_width
}
/// Root raised cosine function
pub fn root_raised_cosine(t: f64, beta: f64, symbol_time: f64) -> f64
{
let eps = 1e-8;
if t.abs() < eps
{
// t = 0 special case
return (1.0 / symbol_time.sqrt()) * (1.0 + beta * (4.0 / PI - 1.0));
}
if beta > 0.0 && (t.abs() - symbol_time / (4.0 * beta)).abs() < eps
{
// t = ±T / (4β) special case
let term1 = (1.0 + 2.0 / PI) * (PI / (4.0 * beta)).sin();
let term2 = (1.0 - 2.0 / PI) * (PI / (4.0 * beta)).cos();
return (beta / (symbol_time.sqrt() * 2.0_f64.sqrt())) * (term1 + term2);
}
// General case
let numerator = (PI * t * (1.0 - beta) / symbol_time).sin()
+ 4.0 * beta * t / symbol_time * (PI * t * (1.0 + beta) / symbol_time).cos();
let denominator = PI * t * (1.0 - (4.0 * beta * t / symbol_time).powi(2)) / symbol_time;
(1.0 / symbol_time.sqrt()) * (numerator / denominator)
}

View File

@ -0,0 +1,48 @@
/// A queue that always contains the same amount of elements.
///
/// It intuitively record the history of `length` values of a value
/// This implementations allows to get a single contiguous slice view on the history
pub struct HistoryBuf<T>
{
buffer: Box<[T]>,
start: usize,
length: usize,
}
impl<T: Clone> HistoryBuf<T>
{
pub fn new(default: T, length: usize) -> Self
{
Self
{
buffer: vec![default; 2 * length].into_boxed_slice(),
start: 0,
length
}
}
pub fn from_fn(length: usize, mut f: impl FnMut(usize) -> T) -> Self
{
Self
{
buffer: (0..(2 * length)).map(|i| f(i)).collect(),
start: 0,
length
}
}
pub fn push(&mut self, data: T)
{
self.buffer[self.start] = data.clone();
self.buffer[self.start + self.length] = data.clone();
self.start += 1;
self.start %= self.length;
}
pub fn as_slice(&self) -> &[T]
{
&self.buffer[self.start..(self.start + self.length)]
}
}

View File

@ -5,6 +5,8 @@ pub mod filtering;
pub mod synthesis;
pub mod units;
/// Maps a float from a range onto another
/// linearly
pub fn map<T: Float>(x: T, x_min: T, x_max: T, o_min: T, o_max: T) -> T
{
((x - x_min) / (x_max - x_min)) * (o_max - o_min) + o_min

View File

@ -8,6 +8,21 @@ use crate::map;
use crate::units::DigitalFrequency;
use crate::units::Phase;
/// Numericaly controlled oscillator
///
/// ```
/// let nco: Nco<f32> = DigitalFrequency::from_rad(2 * f32::PI).into();
/// // Or
/// let nco: Nco<f32> = Nco::from(DigitalFrequency::from_rad(2 * f32::PI));
///
/// // Rotates by 90deg per sample
/// // The next function is from the iterator implementation
/// assert_eq!(nco.next(), Complex::new(1., 0.));
/// assert_eq!(nco.next(), Complex::new(0., 1.));
/// assert_eq!(nco.next(), Complex::new(-1., 0.));
/// assert_eq!(nco.next(), Complex::new(0., -1.));
/// assert_eq!(nco.next(), Complex::new(1., 0.));
/// ```
#[derive(Clone, Copy)]
pub struct Nco<T>
{
@ -21,6 +36,7 @@ pub struct Nco<T>
impl<T> Nco<T>
{
/// Creates a new Nco with a specific frequency starting at phase 0
pub fn new(frequency: DigitalFrequency) -> Self
{
Self {
@ -30,11 +46,13 @@ impl<T> Nco<T>
}
}
/// Gets the current frequency of the oscillator
pub fn frequency(&self) -> DigitalFrequency
{
DigitalFrequency(self.d_phase)
}
/// Creates a new Nco with a specific frequency and starting phase
pub fn with_phase(frequency: DigitalFrequency, phase: Phase) -> Self
{
Self {
@ -44,16 +62,19 @@ impl<T> Nco<T>
}
}
/// Sets the current phase.
pub fn set_phase(&mut self, phase: Phase)
{
self.phase = phase.0.0;
}
/// Sets the current phase
pub fn set_frequency(&mut self, frequency: DigitalFrequency)
{
self.d_phase = frequency.0;
}
/// Steps the oscillator by one sample
pub fn step(&mut self)
{
let (new, _) = self.phase.overflowing_add(self.d_phase);
@ -63,6 +84,8 @@ impl<T> Nco<T>
impl<T: Float + From<f32>> Nco<T>
{
/// Gets the current value of the oscillator as a
/// complex number
pub fn sample(&self) -> Complex<T>
{
let t = map(

View File

@ -3,15 +3,21 @@ use std::ops::Neg;
use crate::map;
// Represents digital frequency
/// Represents a digital, sampled frequency
/// as radians per samples in [0; 2*pi[ range
/// mapped to the whole [0; usize::MAX] range
#[derive(Clone, Copy, PartialEq, PartialOrd)]
pub struct DigitalFrequency(pub usize);
/// Represents an absolute phase offset
/// as radians in [0; 2*pi[ range
/// mapped to the whole [0; usize::MAX] range
#[derive(Clone, Copy, PartialEq, PartialOrd)]
pub struct Phase(pub DigitalFrequency);
impl DigitalFrequency
{
/// Creates a DigitalFrequency from rads per samples
pub fn from_rad(radians_per_sample: f64) -> Self
{
// Frequnecy wraps arround : Going at 2 pi radians per second
@ -22,16 +28,21 @@ impl DigitalFrequency
DigitalFrequency(map(f, 0., 2. * PI, 0., usize::MAX as f64).floor() as usize)
}
/// Creates a DigitalFrequency from a frequency given in hertz (s^(-1))
/// in the context of a sample rate also given in hertz
pub fn from_time_frequency(hertz: f64, sample_rate: f64) -> Self
{
Self::from_rad(map(hertz, 0., sample_rate, 0., 2. * PI))
}
/// Gets the frequency as radians per sample
pub fn as_rad(&self) -> f64
{
map(self.0 as f64, 0., usize::MAX as f64, 0., 2. * PI)
}
/// Gets the frequency as hertz in the context of a sample rate
/// also given in hert
pub fn as_time_frequency(&self, sample_rate: f64) -> f64
{
map(self.0 as f64, 0., usize::MAX as f64, 0., sample_rate)
@ -42,6 +53,7 @@ impl Neg for DigitalFrequency
{
type Output = Self;
/// Returns the "negative frequency"
fn neg(self) -> Self::Output
{
DigitalFrequency(usize::MAX - self.0)

View File

@ -4,4 +4,5 @@ version = "0.1.0"
edition = "2024"
[dependencies]
crossbeam-deque = "0.8.6"
oxydsp-flowgraph-macros = { path = "./oxydsp-flowgraph-macros" }

View File

@ -1,19 +1,11 @@
use proc_macro::TokenStream;
use zyn::FromInput;
use zyn::ToTokens;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::ext::ItemExt;
use zyn::format_ident;
use zyn::ident;
use zyn::parse_input;
use zyn::syn::Attribute;
use zyn::syn::GenericParam;
use zyn::syn::Index;
use zyn::syn::Lit;
use zyn::syn::TypeGenerics;
use zyn::syn::parse_quote;
use zyn::syn::punctuated::Punctuated;
use zyn::syn::spanned::Spanned;
mod sync;
@ -52,53 +44,67 @@ pub fn block_io(
impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.clone() }} {{ type_generics }}
{{ where_clause }}
{
@block_io_set_index(fields = fields.clone())
@block_io_get_successors(fields = fields.clone())
@block_io_counts(fields = fields.clone())
@block_io_set_streams(fields = fields.clone())
@block_io_create_stream(fields = fields.clone())
@block_io_get_inputs(fields = fields.clone())
@block_io_get_outputs(fields = fields.clone())
@block_io_get_meta(ident = ident.clone(), fields = fields.clone())
}
)
}
#[zyn::element]
fn block_io_set_index(fields: zyn::syn::Fields) -> zyn::TokenStream
fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn set_index(&self, block_index: usize)
fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousIn>
{
use oxydsp_flowgraph::edge::BlockIOIndex;
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockInput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
acc.extend(self.{{field.1.ident}}.get_inputs_mut());
}
acc
}
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousIn>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockInput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
self.{{field.1.ident}}.set_block_index(BlockIOIndex {block_index, port_index: {{ field.0 }} });
acc.extend(self.{{field.1.ident}}.get_inputs());
}
acc
}
)
}
#[zyn::element]
fn block_io_get_successors(fields: zyn::syn::Fields) -> zyn::TokenStream
fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn get_successors(&self) -> Vec<oxydsp_flowgraph::edge::BlockIOIndex>
fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousOut>
{
let mut output = vec![];
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockOutput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
if let Some(block_index) = self.{{ field.1.ident }}.get_consumer_block()
{
output.push(block_index);
}
acc.extend(self.{{field.1.ident}}.get_outputs_mut());
}
output
acc
}
fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousOut>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockOutput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
acc.extend(self.{{field.1.ident}}.get_outputs());
}
acc
}
)
}
@ -135,103 +141,16 @@ fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::T
fn get_output_type_names(&self) -> Vec<&'static str>
{
let mut output = Vec::new();
use oxydsp_flowgraph::block::BlockOutput;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
output.push(self.{{ field.1.ident.clone() }}.get_type_name());
output.extend(self.{{ field.1.ident.clone() }}.get_type_names());
}
return output;
}
)
}
#[zyn::element]
fn block_io_counts(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
let input_count = fields
.iter()
.filter(|x| x.attrs.iter().any(|x| x.is("input")))
.count();
let output_count = fields
.iter()
.filter(|x| x.attrs.iter().any(|x| x.is("output")))
.count();
zyn::zyn!(
fn input_count(&self) -> usize
{
return { { input_count } };
}
fn output_count(&self) -> usize
{
return { { output_count } };
}
)
}
#[zyn::element(debug = "pretty")]
fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
{
zyn::zyn!(
#[allow(unreachable_code)]
fn set_anonymous_out_stream(
&mut self,
output_index: usize,
producer: oxydsp_flowgraph::io::AnonymousStreamProducer,
)
{
match output_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(producer),
}
_ => panic!("output_index out of bounds.")
};
}
#[allow(unreachable_code)]
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::io::AnonymousStreamConsumer)
{
match input_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
{{ field.0 }} => self.{{field.1.ident}}.set_anonymous_stream(consumer),
}
_ => panic!("output_index out of bounds.")
};
}
)
}
#[zyn::element]
fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream
{
zyn::zyn!(
#[allow(unreachable_code)]
fn create_anonymous_stream_for(
&mut self,
output_index: usize,
capacity: usize,
) -> (
oxydsp_flowgraph::io::AnonymousStreamProducer,
oxydsp_flowgraph::io::AnonymousStreamConsumer,
)
{
let output = match output_index
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.0 }} => self.{{ field.1.ident }}.create_anonymous_stream(capacity),
}
_ => panic!("output_index out of bounds."),
};
return output;
}
)
}
#[zyn::element]
fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream
{

View File

@ -1,14 +1,10 @@
use zyn::Fields;
use zyn::FromInput;
use zyn::ToTokens;
use zyn::ast::at;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::ext::TypeExt;
use zyn::quote::quote;
use zyn::syn::Field;
use zyn::syn::GenericParam;
use zyn::syn::Lifetime;
use zyn::syn::parse_quote;
use crate::SyncBlockConfig;

View File

@ -1,6 +1,8 @@
use crate::edge::BlockIOIndex;
use crate::io::AnonymousStreamConsumer;
use crate::io::AnonymousStreamProducer;
use crate::io::edge::AnonymousIn;
use crate::io::edge::AnonymousOut;
use crate::io::In;
use crate::io::Out;
use crate::io::edge::BlockIOIndex;
pub enum BlockResult
{
@ -15,28 +17,60 @@ pub enum BlockResult
Exit,
}
pub trait BlockInput
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>;
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>;
// Meta information
fn get_type_names(&self) -> Vec<&'static str>;
}
pub trait BlockOutput
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>;
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>;
// Meta information
fn get_type_names(&self) -> Vec<&'static str>;
}
pub trait BlockIO
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>;
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>;
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>;
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>;
fn get_successors(&self) -> Vec<BlockIOIndex>
{
self.get_outputs()
.iter()
.map(|x| x.get_consumer_block().unwrap())
.collect()
}
// Get all of the BlockIOIndices (block index + port) that
// this blocks can send data to.
fn get_successors(&self) -> Vec<BlockIOIndex>;
// Sets the index of the current blocks on the shared edges
fn set_index(&self, block_index: usize);
// Number of input/output ports
fn input_count(&self) -> usize;
fn output_count(&self) -> usize;
// Stream managment
fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer);
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer);
fn create_anonymous_stream_for(
&mut self,
output_index: usize,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
// fn get_successors(&self) -> Vec<BlockIOIndex>;
//
// // Sets the index of the current blocks on the shared edges
// fn set_index(&self, block_index: usize);
//
// // Number of input/output ports
// fn input_count(&self) -> usize;
// fn output_count(&self) -> usize;
//
// // Stream managment
// fn set_anonymous_out_stream(&mut self, output_index: usize, producer: AnonymousStreamProducer);
// fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: AnonymousStreamConsumer);
//
// fn create_anonymous_stream_for(
// &mut self,
// output_index: usize,
// capacity: usize,
// ) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
// Meta information
fn get_block_name(&self) -> &'static str;
@ -67,3 +101,173 @@ pub trait SyncBlock<'view>: SyncBlockIO<'view>
pub trait GraphableBlock: Block + BlockIO {}
impl<T> GraphableBlock for T where T: Block + BlockIO {}
impl<T: 'static> BlockInput for In<T>
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>
{
vec![self]
}
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>
{
vec![self]
}
fn get_type_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<T>()]
}
}
impl<I: BlockInput> BlockInput for Option<I>
{
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>
{
if let Some(input) = self
{
input.get_inputs_mut()
}
else
{
vec![]
}
}
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>
{
if let Some(input) = self
{
input.get_inputs()
}
else
{
vec![]
}
}
fn get_type_names(&self) -> Vec<&'static str>
{
if let Some(input) = self
{
input.get_type_names()
}
else
{
vec![]
}
}
}
impl<I: BlockInput, const N: usize> BlockInput for [I; N]
{
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>
{
let mut output = vec![];
for input in self
{
output.extend(input.get_inputs());
}
output
}
fn get_inputs_mut(&mut self) -> Vec<&mut dyn AnonymousIn>
{
let mut output = vec![];
for input in self
{
output.extend(input.get_inputs_mut());
}
output
}
fn get_type_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<I>(); N]
}
}
impl<T: 'static> BlockOutput for Out<T>
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>
{
vec![self]
}
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>
{
vec![self]
}
fn get_type_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<T>()]
}
}
impl<I: BlockOutput> BlockOutput for Option<I>
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>
{
if let Some(output) = self
{
output.get_outputs_mut()
}
else
{
vec![]
}
}
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>
{
if let Some(output) = self
{
output.get_outputs()
}
else
{
vec![]
}
}
fn get_type_names(&self) -> Vec<&'static str>
{
if let Some(input) = self
{
input.get_type_names()
}
else
{
vec![]
}
}
}
impl<I: BlockOutput, const N: usize> BlockOutput for [I; N]
{
fn get_outputs_mut(&mut self) -> Vec<&mut dyn AnonymousOut>
{
let mut result = vec![];
for output in self
{
result.extend(output.get_outputs_mut());
}
result
}
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>
{
let mut result = vec![];
for output in self
{
result.extend(output.get_outputs());
}
result
}
fn get_type_names(&self) -> Vec<&'static str>
{
vec![std::any::type_name::<I>(); N]
}
}

View File

@ -1,20 +0,0 @@
use std::any::Any;
#[derive(Default)]
pub struct Edge
{
// Represents the index of the block owning the Out end in the graph
// And the the output index within that block
pub from: Option<BlockIOIndex>,
// Represents the index of the block owning the In end in the graph
// And the the input index within that block
pub to: Option<BlockIOIndex>,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct BlockIOIndex
{
pub block_index: usize,
pub port_index: usize,
}

View File

@ -1,5 +0,0 @@
// Represents a FlowGrahWide, simultaneous event
pub enum FlowGraphEvent
{
Kill(String),
}

View File

@ -1,6 +1,12 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::thread::JoinHandle;
use crossbeam_deque::Steal;
use crossbeam_deque::Worker;
use crate::block::GraphableBlock;
use crate::io::edge::BlockIOIndex;
#[macro_export]
macro_rules! flowgraph
@ -17,6 +23,21 @@ macro_rules! flowgraph
}
}
pub struct RunningGraph
{
worker_handles: Vec<JoinHandle<()>>,
}
impl RunningGraph
{
pub fn join(self)
{
self.worker_handles.into_iter().for_each(|j| {
let _ = j.join();
});
}
}
pub struct FlowGraph
{
blocks: Vec<Box<dyn GraphableBlock + Send + 'static>>,
@ -31,48 +52,157 @@ impl FlowGraph
pub fn add_block<T: GraphableBlock + Send + 'static>(&mut self, block: T)
{
block.set_index(self.blocks.len());
block.get_inputs().iter().enumerate().for_each(|(i, x)| {
x.set_index(BlockIOIndex {
block_index: self.blocks.len(),
port_index: i,
})
});
block.get_outputs().iter().enumerate().for_each(|(i, x)| {
x.set_index(BlockIOIndex {
block_index: self.blocks.len(),
port_index: i,
})
});
self.blocks.push(Box::new(block));
}
pub fn run(mut self) -> JoinHandle<()>
// pub fn run(mut self, thread_count: usize) -> RunningGraph
// {
// self.populate_edges();
//
// let mut worker_queues = (0..thread_count).map(|_| vec![]).collect::<Vec<_>>();
//
// for (i, block) in self.blocks.into_iter().enumerate()
// {
// worker_queues[i % thread_count].push(block);
// }
//
// let running = Arc::new(AtomicBool::new(true));
// let worker_handles = worker_queues
// .into_iter()
// .map(|mut queue| {
// let running = running.clone();
// std::thread::spawn(move || {
// 'outer: while running.load(std::sync::atomic::Ordering::Relaxed)
// {
// for block in queue.iter_mut()
// {
// match block.work()
// {
// crate::block::BlockResult::Ok =>
// {
// // Reschedule block
// }
// crate::block::BlockResult::Terminated =>
// { // DROP BLOCK
// }
// crate::block::BlockResult::Exit =>
// {
// println!("KILLING GRAPH");
// break 'outer;
// }
// }
// }
// }
// running.store(false, std::sync::atomic::Ordering::Relaxed);
// })
// })
// .collect::<Vec<_>>();
// RunningGraph { worker_handles }
// }
pub fn run(mut self, thread_count: usize) -> RunningGraph
{
self.populate_edges();
std::thread::spawn(move || {
'outer: loop
{
for x in self.blocks.iter_mut()
{
match x.work()
let worker_queues = (0..thread_count)
.map(|_| Worker::<Box<dyn GraphableBlock + Send + 'static>>::new_fifo())
.collect::<Vec<_>>();
worker_queues
.iter()
.cycle()
.zip(self.blocks)
.for_each(|(worker, block)| worker.push(block));
let stealers = worker_queues
.iter()
.map(|x| x.stealer())
.collect::<Vec<_>>();
let running = Arc::new(AtomicBool::new(true));
let worker_handles = worker_queues
.into_iter()
.map(|queue| {
let stealers = stealers.clone();
let running = running.clone();
std::thread::spawn(move || {
'outer: while running.load(std::sync::atomic::Ordering::Relaxed)
{
crate::block::BlockResult::Ok =>
{}
crate::block::BlockResult::Terminated =>
{ //break 'outer;
}
crate::block::BlockResult::Exit =>
// Try to get a job
let mut block = queue.pop().unwrap_or_else(|| {
std::iter::repeat_with(|| {
stealers
.iter()
.map(|stealer| stealer.steal_batch_and_pop(&queue))
.collect::<Steal<_>>()
.success()
})
.find(|x| x.is_some())
.unwrap()
.unwrap()
});
match block.work()
{
println!("KILLING GRAPH");
break 'outer;
crate::block::BlockResult::Ok =>
{
// Reschedule block
queue.push(block);
}
crate::block::BlockResult::Terminated =>
{ // DROP BLOCK
}
crate::block::BlockResult::Exit =>
{
println!("KILLING GRAPH");
break 'outer;
}
}
}
}
}
})
running.store(false, std::sync::atomic::Ordering::Relaxed);
})
})
.collect::<Vec<_>>();
RunningGraph { worker_handles }
}
fn populate_edges(&mut self)
{
for block_index in 0..self.blocks.len()
{
let successors = self.blocks[block_index].get_successors();
for (output_index, succ_id) in successors.iter().enumerate()
let outputs = self.blocks[block_index].get_outputs_mut();
let mut rxs = vec![];
for output in outputs.into_iter()
{
let (tx, rx) =
self.blocks[block_index].create_anonymous_stream_for(output_index, 4096);
self.blocks[block_index].set_anonymous_out_stream(output_index, tx);
self.blocks[succ_id.block_index].set_anonymous_in_stream(succ_id.port_index, rx);
//let (tx, rx) = output.create_anonymous_stream(4096);
let (tx, rx) = output.create_anonymous_stream(8192);
//let (tx, rx) = output.create_anonymous_stream(65536);
output.set_anonymous_stream(tx);
rxs.push((
output
.get_consumer_block()
.expect("Non existent destination block."),
rx,
))
}
for (index, rx) in rxs.into_iter()
{
self.blocks[index.block_index].get_inputs_mut()[index.port_index]
.set_anonymous_stream(rx);
}
}
}

View File

@ -1,21 +1,21 @@
use std::any::Any;
use std::mem::ManuallyDrop;
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::sync::Mutex;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::edge::BlockIOIndex;
use crate::edge::Edge;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::stream::{self};
use crate::tag::Tag;
use crate::tag::TagSlot;
use crate::tag::Tagged;
pub mod edge;
use crate::io::edge::Edge;
/// Represents a input port for a block
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
@ -25,6 +25,7 @@ pub struct In<T>
edge: Arc<Mutex<Edge>>,
}
/// Represents a output port for a block
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
@ -34,18 +35,293 @@ pub struct Out<T>
edge: Arc<Mutex<Edge>>,
}
pub struct InReader<'a, T>
// Input Output interfaces
/// Output interface to write elements in a "push" fashion
pub struct OutPush<'a, T>
{
data_reader: StreamReader<'a, T>,
tag_reader: StreamReader<'a, TagSlot>,
data_writer: ManuallyDrop<StreamWriter<'a, T>>,
tag_writer: ManuallyDrop<StreamWriter<'a, TagSlot>>,
total_length: usize,
written_data: usize,
written_tags: usize,
start_index: usize,
}
pub struct OutWriter<'a, T>
impl<'a, T: 'static> OutPush<'a, T>
{
data_writer: StreamWriter<'a, T>,
tag_writer: StreamWriter<'a, TagSlot>,
pub fn len(&self) -> usize
{
let data_slices = self.data_writer.slices();
// This gives better performance !
// Probably because it prevents claiming the whole buffer at once
// But this is hacky. It should probably be managed at a lower level
// ((data_slices.0.len() + data_slices.1.len()) / 2) - self.written_data
data_slices.0.len() + data_slices.1.len() - self.written_data
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn push(&mut self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
// println!("\n\n\n");
if self.written_data >= self.total_length
{
return Err(data);
}
let data_slices = self.data_writer.slices_mut();
let tag_slices = self.tag_writer.slices_mut();
// Write a data
let data_ref = {
if self.written_data < data_slices.0.len()
{
&mut data_slices.0[self.written_data]
}
else
{
&mut data_slices.1[self.written_data - data_slices.0.len()]
}
};
// Index of the taken element within the stream.
*data_ref = MaybeUninit::new(data.0);
let element_index = self.start_index + self.written_data;
self.written_data += 1;
// Check for corresponding tag
let tag_ref = {
if self.written_tags < tag_slices.0.len()
{
&mut tag_slices.0[self.written_tags]
}
else
{
&mut tag_slices.1[self.written_tags - tag_slices.0.len()]
}
};
if let Some(tag) = data.1
{
*tag_ref = MaybeUninit::new(TagSlot {
position: element_index,
tag,
});
self.written_tags += 1;
}
Ok(())
}
}
impl<'a, T> Drop for OutPush<'a, T>
{
fn drop(&mut self)
{
let mut data_writer =
unsafe { ManuallyDrop::<StreamWriter<'a, T>>::take(&mut self.data_writer) };
let mut tag_writer =
unsafe { ManuallyDrop::<StreamWriter<'a, TagSlot>>::take(&mut self.tag_writer) };
tag_writer.produce(self.written_tags);
data_writer.produce(self.written_data);
}
}
pub struct InIter<'a, T>
{
data_reader: ManuallyDrop<StreamReader<'a, T>>,
tag_reader: ManuallyDrop<StreamReader<'a, TagSlot>>,
total_length: usize,
total_tag_length: usize,
read_data: usize,
read_tags: usize,
start_index: usize,
}
impl<'a, T> Iterator for InIter<'a, T>
{
type Item = Tagged<T>;
fn next(&mut self) -> Option<Self::Item>
{
if self.read_data >= self.total_length
{
return None;
}
let data_slices = self.data_reader.slices_mut();
let tag_slices = self.tag_reader.slices_mut();
// Take a data
// SAFETY:
// All takable should contain a valid element: guarteed by the queue
// We strictly monotonicly take all elements, in order, in the slices.
// No two same indices should be taken.
// We cound the number of taken elemnts and consume the correct amount from the queue
let data = {
if self.read_data < data_slices.0.len()
{
unsafe { data_slices.0[self.read_data].take() }
}
else
{
unsafe { data_slices.1[self.read_data - data_slices.0.len()].take() }
}
};
// Index of the taken element within the stream.
let element_index = self.start_index + self.read_data;
self.read_data += 1;
// Check for corresponding tag
let mut tag = None;
if self.read_tags < self.total_tag_length
{
let tag_ref = {
if self.read_tags < tag_slices.0.len()
{
&mut tag_slices.0[self.read_tags]
}
else
{
&mut tag_slices.1[self.read_tags - tag_slices.0.len()]
}
};
// SAFETY:
// Same as before : strictly monotic access in the tag slices
if unsafe { tag_ref.peek().position == element_index }
{
// The next tag in line is tagging the just-poped element.
// We get it
tag = Some(unsafe { tag_ref.take().tag });
self.read_tags += 1;
}
}
Some(Tagged::new(data, tag))
}
fn size_hint(&self) -> (usize, Option<usize>)
{
let len =
self.data_reader.slices().0.len() + self.data_reader.slices().1.len() - self.read_data;
(len, Some(len))
}
}
impl<'a, T> Drop for InIter<'a, T>
{
fn drop(&mut self)
{
let mut data_reader =
unsafe { ManuallyDrop::<StreamReader<'a, T>>::take(&mut self.data_reader) };
let mut tag_reader =
unsafe { ManuallyDrop::<StreamReader<'a, TagSlot>>::take(&mut self.tag_reader) };
tag_reader.consume(self.read_tags);
data_reader.consume(self.read_data);
}
}
impl<'a, T> ExactSizeIterator for InIter<'a, T> {}
impl<T: 'static> In<T>
{
pub fn iter<'a>(&'a mut self) -> InIter<'a, T>
{
let first_index = self.stream.as_ref().unwrap().first_index();
let data_reader = self.stream.as_mut().unwrap().read_takable();
let total_length = data_reader.slices().0.len() + data_reader.slices().1.len();
let tag_reader = self.tag_stream.as_mut().unwrap().read_takable();
let total_tag_length = tag_reader.slices().0.len() + tag_reader.slices().1.len();
InIter {
data_reader: ManuallyDrop::new(data_reader),
tag_reader: ManuallyDrop::new(tag_reader),
read_data: 0,
read_tags: 0,
total_length,
total_tag_length,
start_index: first_index,
}
}
}
impl<T: 'static> Out<T>
{
pub fn write_push<'a>(&'a mut self) -> OutPush<'a, T>
{
let first_index = self.stream.as_ref().unwrap().first_index();
let data_writer = self.stream.as_mut().unwrap().write();
let total_length = data_writer.slices().0.len() + data_writer.slices().1.len();
let tag_writer = self.tag_stream.as_mut().unwrap().write();
OutPush {
data_writer: ManuallyDrop::new(data_writer),
tag_writer: ManuallyDrop::new(tag_writer),
total_length,
written_data: 0,
written_tags: 0,
start_index: first_index,
}
}
/// Pushes an iterator to the output, sending the maximum amount of elements
/// to the output.
///
/// It will not consume the iterator more than what can be sent.
///
/// ```
/// let writer = output.write();
///
/// // Send only 42s to the output
/// writer.push_iter(std::iter::repeat(42));
/// ```
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
{
let mut pusher = self.write_push();
let mut len = pusher.len();
while len > 0
{
len -= 1;
match iter.next()
{
Some(element) => {
let _ = pusher.push(element);
},
None => return false,
}
}
return false;
}
/// Meta information
/// Returns a string of the type of the output
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}
/// Creates a stream that can then be used to link blocks
///
/// ```rust
/// let (output, input) = oxydsp-flowgraph::io::stream();
///
/// let writer = output.write();
/// let reader = input.read();
///
/// // ...
/// ```
pub fn stream<T>() -> (Out<T>, In<T>)
{
let edge = Arc::new(Mutex::new(Edge::default()));
@ -63,300 +339,97 @@ pub fn stream<T>() -> (Out<T>, In<T>)
)
}
impl<T: 'static> In<T>
pub fn streams<T, const N: usize>() -> ([Out<T>; N], [In<T>; N])
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
// Ugly simultanous initialization
let mut ins: [_; N] = std::array::from_fn(|_| None);
let mut outs: [_; N] = std::array::from_fn(|_| None);
pub fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
ins.iter_mut()
.zip(outs.iter_mut())
.for_each(|(input, output)| {
let (newout, newin) = stream();
*input = Some(newin);
*output = Some(newout);
});
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
let ins_some: [_; N] = std::array::from_fn(|i| ins[i].take().unwrap());
let outs_some: [_; N] = std::array::from_fn(|i| outs[i].take().unwrap());
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
{
let data_reader = self.stream.as_mut().unwrap().read();
let tag_reader = self.tag_stream.as_mut().unwrap().read();
InReader {
data_reader,
tag_reader,
}
}
(outs_some, ins_some)
}
impl<T: 'static> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
// --------------------
// Iterator facilites
// --------------------
pub fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
// which knows the stream type
pub fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
pub fn write<'a>(&'a mut self) -> OutWriter<'a, T>
{
OutWriter {
data_writer: self.stream.as_mut().unwrap().write(),
tag_writer: self.tag_stream.as_mut().unwrap().write(),
}
}
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
for _ in 0..len
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
}
else
{
return false;
}
}
true
}
// Meta information
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}
impl<T> InReader<'_, T>
{
pub fn len(&self) -> usize
{
self.data_reader.len()
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn pop(&self) -> Option<Tagged<T>>
{
let data = self.data_reader.pop_with_index();
if let Some((data, index)) = data
{
let mut tag = None;
if self
.tag_reader
.peek(|t| t.position)
.is_some_and(|x| x == index)
{
tag = self.tag_reader.pop();
}
Some((data, tag.map(|t| t.tag)).into())
}
else
{
None
}
}
pub fn pop_untag(&self) -> Option<T>
{
self.pop().map(|data| data.into_inner())
}
}
impl<T> OutWriter<'_, T>
{
pub fn len(&self) -> usize
{
self.data_writer.len().min(self.tag_writer.len())
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
let (data, tag) = data.into();
let position = self.data_writer.next_index();
let tag = tag.map(|t| TagSlot { position, tag: t });
match self.data_writer.push(data)
{
Ok(_) if tag.is_some() =>
{
let _ = self.tag_writer.push(tag.unwrap());
Ok(())
}
Ok(_) => Ok(()),
Err(data) => Err((data, tag.map(|t| t.tag)).into()),
}
}
pub fn push_no_tag(&self, data: T) -> Result<(), T>
{
self.data_writer.push(data)
}
}
pub struct PopIter<T>
{
len: usize,
popped: usize,
reader: T,
}
pub trait PopIterable<'a>
{
type Output;
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
impl<'a, T: 'static> PopIterable<'a> for In<T>
{
type Output = InReader<'a, T>;
fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
{
let reader = self.read();
PopIter {
popped: 0,
len: reader.len(),
reader,
}
}
}
generate_pop_iterable_tuple_impl! {1}
generate_pop_iterable_tuple_impl! {2}
generate_pop_iterable_tuple_impl! {3}
generate_pop_iterable_tuple_impl! {4}
generate_pop_iterable_tuple_impl! {5}
generate_pop_iterable_tuple_impl! {6}
generate_pop_iterable_tuple_impl! {7}
generate_pop_iterable_tuple_impl! {8}
generate_pop_iterable_tuple_impl! {9}
generate_pop_iterable_tuple_impl! {10}
generate_pop_iterable_tuple_impl! {11}
generate_pop_iterable_tuple_impl! {12}
impl<'a, T> Iterator for PopIter<InReader<'a, T>>
{
type Item = Tagged<T>;
fn next(&mut self) -> Option<Self::Item>
{
self.reader.pop()
}
}
impl_iterator_for_pop_iter_tuple! {1}
impl_iterator_for_pop_iter_tuple! {2}
impl_iterator_for_pop_iter_tuple! {3}
impl_iterator_for_pop_iter_tuple! {4}
impl_iterator_for_pop_iter_tuple! {5}
impl_iterator_for_pop_iter_tuple! {6}
impl_iterator_for_pop_iter_tuple! {7}
impl_iterator_for_pop_iter_tuple! {8}
impl_iterator_for_pop_iter_tuple! {9}
impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
// Needed for graph to be able to manipulate
// stream endings without knowing the generic type
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<TagSlot>,
}
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
inner_tag: StreamConsumer<TagSlot>,
}
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<TagSlot>)> for AnonymousStreamProducer
{
fn from(value: (StreamProducer<T>, StreamProducer<TagSlot>)) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<TagSlot>)> for AnonymousStreamConsumer
{
fn from(value: (StreamConsumer<T>, StreamConsumer<TagSlot>)) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl AnonymousStreamProducer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<TagSlot>)
{
(
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
self.inner_tag,
)
}
}
impl AnonymousStreamConsumer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<TagSlot>)
{
(
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
self.inner_tag,
)
}
}
// pub trait PushIterable<'a, T, I>
// where
// I: Iterator<Item = T>,
// An iterator type to push data to output(s)
// pub struct PopIter<T>
// {
// fn push_iter(&'a mut self, iter: I) -> bool;
// len: usize,
// popped: usize,
// reader: T,
// }
// Type on which data can be popped from
// pub trait PopIterable<'a>
// {
// type Output;
//
// /// Returns an iterator on the input elements :
// ///
// /// ```
// /// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !"));
// /// ```
// fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
// }
// impl<'a, T: 'static> PopIterable<'a> for In<T>
// {
// type Output = InReader<'a, T>;
// fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
// {
// let reader = self.read();
// PopIter {
// popped: 0,
// len: reader.len(),
// reader,
// }
// }
// }
//
// generate_pop_iterable_tuple_impl! {1}
// generate_pop_iterable_tuple_impl! {2}
// generate_pop_iterable_tuple_impl! {3}
// generate_pop_iterable_tuple_impl! {4}
// generate_pop_iterable_tuple_impl! {5}
// generate_pop_iterable_tuple_impl! {6}
// generate_pop_iterable_tuple_impl! {7}
// generate_pop_iterable_tuple_impl! {8}
// generate_pop_iterable_tuple_impl! {9}
// generate_pop_iterable_tuple_impl! {10}
// generate_pop_iterable_tuple_impl! {11}
// generate_pop_iterable_tuple_impl! {12}
//
// impl<'a, T> Iterator for PopIter<InReader<'a, T>>
// {
// type Item = Tagged<T>;
//
// fn next(&mut self) -> Option<Self::Item>
// {
// self.reader.pop()
// }
// }
// impl_iterator_for_pop_iter_tuple! {1}
// impl_iterator_for_pop_iter_tuple! {2}
// impl_iterator_for_pop_iter_tuple! {3}
// impl_iterator_for_pop_iter_tuple! {4}
// impl_iterator_for_pop_iter_tuple! {5}
// impl_iterator_for_pop_iter_tuple! {6}
// impl_iterator_for_pop_iter_tuple! {7}
// impl_iterator_for_pop_iter_tuple! {8}
// impl_iterator_for_pop_iter_tuple! {9}
// impl_iterator_for_pop_iter_tuple! {10}
// impl_iterator_for_pop_iter_tuple! {11}
// impl_iterator_for_pop_iter_tuple! {12}

View File

@ -0,0 +1,182 @@
use std::any::Any;
use crate::{io::{In, Out}, stream::{self, StreamConsumer, StreamProducer}, tag::TagSlot};
/// Shared object between a block's input and output objects
/// so they can "communicate" and know about each other
#[derive(Default)]
pub struct Edge
{
/// Represents the index of the block owning the Out end in the graph
/// And the the output index within that block
pub from: Option<BlockIOIndex>,
/// Represents the index of the block owning the In end in the graph
/// And the the input index within that block
pub to: Option<BlockIOIndex>,
}
/// Reprensents the location of a port (input or output) in terms of
/// - The block index in which they exist
/// - Their input or output index within that block
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct BlockIOIndex
{
pub block_index: usize,
pub port_index: usize,
}
/// Trait to manipulate a block's input in a type agnostic/erased way
pub trait AnonymousIn
{
/// Inform the input about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
/// Returns None or the block index of the block, and the block port of the corresponding
/// Out object
fn get_producer_block(&self) -> Option<BlockIOIndex>;
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer);
}
/// Trait to manipulate a block's output in a type agnostic/erased way
pub trait AnonymousOut
{
/// Inform the output about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer);
/// Returns None or the block index of the block, and the block port of the corresponding
/// In object
fn get_consumer_block(&self) -> Option<BlockIOIndex>;
/// Creates the stream with the correct corresponding type, in a type erased way.
///
/// This delegation of stream creation is necessary to allow the graph to manipulate
/// it, as it cannot know about the generic type of the stream.
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
}
impl<T: 'static> AnonymousIn for In<T>
{
fn set_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
}
impl<T: 'static> AnonymousOut for Out<T>
{
fn set_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
// which knows the stream type
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
}
/// StreamProducer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<TagSlot>,
}
/// StreamConsumer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
inner_tag: StreamConsumer<TagSlot>,
}
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<TagSlot>)> for AnonymousStreamProducer
{
fn from(value: (StreamProducer<T>, StreamProducer<TagSlot>)) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<TagSlot>)> for AnonymousStreamConsumer
{
fn from(value: (StreamConsumer<T>, StreamConsumer<TagSlot>)) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl AnonymousStreamProducer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<TagSlot>)
{
(
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
self.inner_tag,
)
}
}
impl AnonymousStreamConsumer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<TagSlot>)
{
(
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
self.inner_tag,
)
}
}

View File

@ -1,9 +1,6 @@
// This crate manages the flowgraph datastructures and execution/scheduling
// as well as the communication between the blocks
/// This crate manages the flowgraph datastructures and execution/scheduling
/// as well as the communication between the blocks
pub mod block;
pub mod edge;
pub mod event;
pub mod graph;
pub mod io;
pub mod stream;

View File

@ -1 +0,0 @@
fn main() {}

View File

@ -1,6 +1,4 @@
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::io::empty;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::sync::Arc;
@ -54,36 +52,35 @@ unsafe impl<T: Send> Sync for StreamProducer<T> {}
unsafe impl<T: Send> Send for StreamConsumer<T> {}
unsafe impl<T: Send> Sync for StreamConsumer<T> {}
// Represents a write operation within a stream producer
pub struct StreamWriter<'a, T>
{
producer: &'a mut StreamProducer<T>,
first: &'a UnsafeCell<[MaybeUninit<T>]>,
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
first_len: usize,
second_len: usize,
written: Cell<usize>,
#[repr(transparent)]
pub struct Takable<T>(MaybeUninit<T>);
// Index of the first element to be pushed
// within the "infinite buffer"
// Used to number tags
start_index: usize,
impl<T> Takable<T>
{
pub fn new(element: T) -> Self
{
Takable(MaybeUninit::new(element))
}
pub unsafe fn take(&mut self) -> T
{
unsafe { std::mem::replace(&mut self.0, MaybeUninit::uninit()).assume_init() }
}
pub unsafe fn peek(&self) -> &T
{
unsafe { self.0.assume_init_ref() }
}
pub unsafe fn peek_mut(&mut self) -> &mut T
{
unsafe { self.0.assume_init_mut() }
}
}
// Represents a read operation within a stream producer
pub struct StreamReader<'a, T>
unsafe fn takable_slice_from_maybe_uninitt<T>(slice: &mut [MaybeUninit<T>]) -> &mut [Takable<T>]
{
producer: &'a StreamConsumer<T>,
first: &'a UnsafeCell<[MaybeUninit<T>]>,
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
first_len: usize,
second_len: usize,
read: Cell<usize>,
// Index of the first element to be read
// within the "infinite buffer"
// Used to number tags
start_index: usize,
unsafe { std::mem::transmute(slice) }
}
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
@ -122,8 +119,78 @@ pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T
)
}
pub struct StreamReader<'a, T>
{
slices: (&'a mut [Takable<T>], &'a mut [Takable<T>]),
// UNSAFE !
inner: &'a mut StreamConsumer<T>,
}
pub struct StreamWriter<'a, T>
{
slices: (&'a mut [MaybeUninit<T>], &'a mut [MaybeUninit<T>]),
// UNSAFE !
inner: &'a mut StreamProducer<T>,
}
impl<'a, T> StreamReader<'a, T>
{
pub fn slices(&self) -> (&[Takable<T>], &[Takable<T>])
{
(self.slices.0, self.slices.1)
}
pub fn slices_mut(&mut self) -> (&mut [Takable<T>], &mut [Takable<T>])
{
(self.slices.0, self.slices.1)
}
pub fn consume(&mut self, read: usize)
{
self.inner.consume(read);
}
}
impl<'a, T> StreamWriter<'a, T>
{
pub fn slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>])
{
(self.slices.0, self.slices.1)
}
pub fn slices_mut(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
{
(self.slices.0, self.slices.1)
}
pub fn produce(&mut self, written: usize)
{
self.inner.produce(written);
}
}
impl<T> StreamProducer<T>
{
pub fn first_index(&self) -> usize
{
self.inner.head.load(Ordering::Relaxed)
}
pub fn produce(&mut self, written: usize)
{
// Advance head.
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
// Check bounds
assert!(head + written - tail <= (self.inner.capacity_mask + 1));
// We want writes to the buffer to be visible when acquired in the pop side
self.inner.head.store(head + written, Ordering::Release);
}
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
{
// We need to claim the maximum amount of elements.
@ -157,57 +224,26 @@ impl<T> StreamProducer<T>
let k = &mut *self.inner.buffer.get();
let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head);
let (start_to_tail, _tail_to_head) =
start_to_head.split_at_mut_unchecked(wrapped_tail);
// Slices are wrapped into unsafe cells to provide interior mutability
// On the stream as it is much more convienient.
//
// SAFETY:
//
// This functions borrows the stream mutably. As such, only one instance
// of StreamWriter can exist for a given stream. The StreamWriter
// is thus the only on able to write or read the stream when it lives
let first_len = head_to_end.len();
let second_len = start_to_tail.len();
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(head_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
// of these slices can exist for a given stream.
StreamWriter {
start_index: head,
producer: self,
first,
second,
first_len,
second_len,
written: 0.into(),
slices: (head_to_end, start_to_head),
inner: self,
}
}
}
else
{
// We MUST have : tail < head
if wrapped_tail < wrapped_head
{
//
// Or
// ▯▯▯▯▯▯▯▯▯▯▯▯▯
// |
// tail & head
// (empty)
// Current configuration :
// ▯▯▯▮▮▮▮▮▮▯▯▯▯
// | |
// tail head
// ___ ____
// slice1 slice2
// slice2 slice1
// SAFETY:
//
@ -224,27 +260,9 @@ impl<T> StreamProducer<T>
let (start_to_tail, _tail_to_head) =
start_to_head.split_at_mut_unchecked(wrapped_tail);
let first_len = head_to_end.len();
let second_len = start_to_tail.len();
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[std::mem::MaybeUninit<T>]>,
>(head_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
StreamWriter {
start_index: head,
producer: self,
first,
second,
first_len,
second_len,
written: 0.into(),
slices: (head_to_end, start_to_tail),
inner: self,
}
}
}
@ -262,8 +280,8 @@ impl<T> StreamProducer<T>
// |
// tail & head
// (full)
// ______.______
// slice2 slice1
// .
// slice1
// SAFETY:
//
@ -273,19 +291,14 @@ impl<T> StreamProducer<T>
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_tail - wrapped_head;
StreamWriter {
start_index: head,
let (start_to_tail, _tail_to_end) = k.split_at_mut_unchecked(wrapped_tail);
let (_start_to_head, head_to_tail) =
start_to_tail.split_at_mut_unchecked(wrapped_head);
let (empty_slice, head_to_tail) = head_to_tail.split_at_mut_unchecked(0);
producer: self,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&k[wrapped_head..wrapped_tail]),
second: None,
written: 0.into(),
StreamWriter {
slices: (head_to_tail, empty_slice),
inner: self,
}
}
}
@ -295,7 +308,25 @@ impl<T> StreamProducer<T>
impl<T> StreamConsumer<T>
{
pub fn read<'a>(&'a mut self) -> StreamReader<'a, T>
pub fn consume(&mut self, read: usize)
{
// Advance head.
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Relaxed);
// Check bounds
assert!(tail + read <= head);
// We want writes to the buffer to be visible when acquired in the pop side
self.inner.tail.store(tail + read, Ordering::Release);
}
pub fn first_index(&self) -> usize
{
self.inner.tail.load(Ordering::Relaxed)
}
pub fn read_takable<'a>(&'a mut self) -> StreamReader<'a, T>
{
// We need to claim the maximum amount of elements.
let head = self.inner.head.load(Ordering::Acquire);
@ -312,24 +343,20 @@ impl<T> StreamConsumer<T>
// Buffer is empty. Return empty slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
let empty = &mut k[0..0];
let (empty_1, empty_2) = empty.split_at_mut_unchecked(0);
StreamReader {
start_index: tail,
producer: self,
first_len: len,
second_len: 0,
first: std::mem::transmute::<&[MaybeUninit<T>], &UnsafeCell<[MaybeUninit<T>]>>(
&k[wrapped_tail..wrapped_head],
slices: (
takable_slice_from_maybe_uninitt(empty_1),
takable_slice_from_maybe_uninitt(empty_2),
),
second: None,
read: 0.into(),
inner: self,
}
}
}
else
{
// Necessarly: wrapped_tail < wrapped_head
// Necessarly: wrapped_tail <= wrapped_head
// Two cases : The buffer overlaps the wrapping or not
if wrapped_tail < wrapped_head
{
@ -347,20 +374,17 @@ impl<T> StreamConsumer<T>
//
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
StreamReader {
start_index: tail,
let k = &mut (&mut *self.inner.buffer.get())[wrapped_tail..wrapped_head];
let (tail_to_head, empty_slice) =
k.split_at_mut_unchecked(wrapped_head - wrapped_tail);
assert_eq!(empty_slice.len(), 0);
producer: self,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&k[wrapped_tail..wrapped_head]),
second: None,
read: 0.into(),
StreamReader {
slices: (
takable_slice_from_maybe_uninitt(tail_to_head),
takable_slice_from_maybe_uninitt(empty_slice),
),
inner: self,
}
}
}
@ -396,293 +420,33 @@ impl<T> StreamConsumer<T>
let (start_to_head, _head_to_tail) =
start_to_tail.split_at_mut_unchecked(wrapped_head);
let first_len = tail_to_end.len();
let second_len = start_to_head.len();
let first = std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(tail_to_end);
let second = Some(std::mem::transmute::<
&mut [MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_head));
StreamReader {
start_index: tail,
producer: self,
first,
second,
first_len,
second_len,
read: 0.into(),
slices: (
takable_slice_from_maybe_uninitt(tail_to_end),
takable_slice_from_maybe_uninitt(start_to_head),
),
inner: self,
}
}
}
}
}
/// Creates a reader of contiguous elements that
/// satisfy the predicate
pub fn read_while<F>(&mut self, predicate: F) -> StreamReader<'_, T>
where
F: Fn(&T) -> bool,
{
// Take a normal reader. This contains available elements to read.
let mut reader = self.read();
// We need to trim the slices to keep only the satified elements
// First slice
let mut first_kept = 0;
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*reader.first.get() }
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Forget about second slice
reader.second_len = 0;
reader.second = None;
// Trim first slice
reader.first_len = first_kept;
unsafe {
reader.first = std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&(&*reader.first.get())[0..first_kept]);
}
return reader;
}
first_kept += 1;
}
// If we are here, all of the elements of the first slice, satisfy the predicate
if let Some(second_slice) = &mut reader.second
{
// Second slice
let mut second_kept = 0;
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*second_slice.get() }
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Trim second slice
reader.second_len = second_kept;
unsafe {
reader.second = Some(std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(
&(&*second_slice.get())[0..first_kept]
));
}
return reader;
}
second_kept += 1;
}
}
return reader;
}
}
impl<'a, T> StreamWriter<'a, T>
{
pub fn len(&self) -> usize
{
self.first_len + self.second_len
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn next_index(&self) -> usize
{
self.start_index + self.written.get()
}
pub fn push(&self, element: T) -> Result<(), T>
{
if self.written.get() < self.first_len
{
unsafe {
(&mut *self.first.get())[self.written.get()] = MaybeUninit::new(element);
}
self.written.set(self.written.get() + 1);
Ok(())
}
else if let Some(second) = &self.second
&& self.written.get() - self.first_len < self.second_len
{
unsafe {
(&mut *second.get())[self.written.get() - self.first_len] =
MaybeUninit::new(element);
}
self.written.set(self.written.get() + 1);
Ok(())
}
else
{
Err(element)
}
}
}
impl<'a, T> StreamReader<'a, T>
{
pub fn len(&self) -> usize
{
self.first_len + self.second_len
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn last_index(&self) -> usize
{
self.start_index + self.len()
}
pub fn next_index(&self) -> usize
{
self.start_index + self.read.get()
}
pub fn pop_with_index(&self) -> Option<(T, usize)>
{
let index = self.next_index();
self.pop().map(|t| (t, index))
}
pub fn peek<F, O>(&self, peeker: F) -> Option<O>
where
F: Fn(&T) -> O,
{
// Same as pop, without taking, or increasing read count
if self.read.get() < self.first_len
{
// SAFETY:
//
// If element is in this slice, it is initialized.
// We take it once since read increases
let element = unsafe { (&mut *self.first.get())[self.read.get()].assume_init_ref() };
Some(peeker(element))
}
else if let Some(second) = &self.second
&& self.read.get() - self.first_len < self.second_len
{
let element =
unsafe { (&mut *second.get())[self.read.get() - self.first_len].assume_init_ref() };
Some(peeker(element))
}
else
{
None
}
}
pub fn pop(&self) -> Option<T>
{
if self.read.get() < self.first_len
{
// SAFETY:
//
// If element is in this slice, it is initialized.
// We take it once since read increases
let element = unsafe {
std::mem::replace(
&mut (&mut *self.first.get())[self.read.get()],
MaybeUninit::uninit(),
)
.assume_init()
};
self.read.set(self.read.get() + 1);
Some(element)
}
else if let Some(second) = &self.second
&& self.read.get() - self.first_len < self.second_len
{
let element = unsafe {
std::mem::replace(
&mut (&mut *second.get())[self.read.get() - self.first_len],
MaybeUninit::uninit(),
)
.assume_init()
};
self.read.set(self.read.get() + 1);
Some(element)
}
else
{
None
}
}
}
// When a Stream writer goes out of scope, it wrote
// some things into the stream. These things need to be commited to the queue
impl<'a, T> Drop for StreamWriter<'a, T>
{
fn drop(&mut self)
{
// Advance head.
// We know that this value hasn't changed since this StreamWriter was created
let head = self.producer.inner.head.load(Ordering::Relaxed);
// We want writes to the buffer to be visible when acquired in the pop side
self.producer
.inner
.head
.store(head + self.written.get(), Ordering::Release);
}
}
// When a Stream reader goes out of scope, it took
// some things from the stream. These things need to be de-commited to the queue
impl<'a, T> Drop for StreamReader<'a, T>
{
fn drop(&mut self)
{
// Advance tail.
// We know that this value hasn't changed since this StreamWriter was created
let tail = self.producer.inner.tail.load(Ordering::Relaxed);
// We want writes to the buffer to be visible when acquired in the push side
self.producer
.inner
.tail
.store(tail + self.read.get(), Ordering::Release);
}
}
// impl<T: Copy> StreamConsumer<T>
// {
// pub fn read(&mut self) -> (&[T], &[T])
// {
// let (slice_1, slice_2) = self.read_takable();
// unsafe { (std::mem::transmute(slice_1), std::mem::transmute(slice_2)) }
// }
// }
mod test
{
#[allow(unused_imports)]
use std::mem::MaybeUninit;
#[allow(unused_imports)]
use crate::stream::bounded_queue;
@ -693,57 +457,76 @@ mod test
let (mut tx, mut rx) = bounded_queue::<usize>(4);
{
let writer = tx.write();
let mut writer = tx.write();
let (a, b) = writer.slices_mut();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
assert_eq!(writer.len(), 4);
a[0] = MaybeUninit::new(0);
a[1] = MaybeUninit::new(1);
a[2] = MaybeUninit::new(2);
a[3] = MaybeUninit::new(3);
assert_eq!(writer.push(1), Ok(()));
assert_eq!(writer.push(2), Ok(()));
assert_eq!(writer.push(3), Ok(()));
assert_eq!(writer.push(4), Ok(()));
assert_eq!(writer.push(5), Err(5));
tx.produce(4);
}
{
let reader = rx.read();
let mut reader = rx.read_takable();
let (a, b) = reader.slices_mut();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
assert_eq!(reader.len(), 4);
unsafe {
assert_eq!(a[0].take(), 0);
assert_eq!(a[1].take(), 1);
assert_eq!(a[2].take(), 2);
assert_eq!(a[3].take(), 3);
}
assert_eq!(reader.pop(), Some(1));
assert_eq!(reader.pop(), Some(2));
assert_eq!(reader.pop(), Some(3));
assert_eq!(reader.pop(), Some(4));
assert_eq!(reader.pop(), None);
rx.consume(4);
}
// Put stream into weird situatino
// Put stream into weird situation
{
let writer = tx.write();
assert_eq!(writer.push(1), Ok(()));
assert_eq!(writer.push(2), Ok(()));
assert_eq!(writer.push(3), Ok(()));
assert_eq!(writer.push(4), Ok(()));
let mut writer = tx.write();
let (a, b) = writer.slices_mut();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
a[0] = MaybeUninit::new(0);
a[1] = MaybeUninit::new(1);
a[2] = MaybeUninit::new(2);
tx.produce(3);
}
{
let reader = rx.read();
assert_eq!(reader.pop(), Some(1));
assert_eq!(reader.pop(), Some(2));
let mut reader = rx.read_takable();
let (a, b) = reader.slices_mut();
assert_eq!(a.len(), 3);
assert_eq!(b.len(), 0);
unsafe {
assert_eq!(a[0].take(), 0);
assert_eq!(a[1].take(), 1);
assert_eq!(a[2].take(), 2);
}
rx.consume(1);
}
{
let writer = tx.write();
assert_eq!(writer.len(), 2);
assert_eq!(writer.push(5), Ok(()));
assert_eq!(writer.push(6), Ok(()));
let (a, b) = writer.slices();
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
}
{
let reader = rx.read();
assert_eq!(reader.pop(), Some(3));
assert_eq!(reader.pop(), Some(4));
assert_eq!(reader.pop(), Some(5));
assert_eq!(reader.pop(), Some(6));
let reader = rx.read_takable();
let (a, b) = reader.slices();
assert_eq!(a.len(), 2);
assert_eq!(b.len(), 0);
}
}
}

View File

@ -48,7 +48,7 @@ use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::RwLock;
/// Object to allocate tags
/// Object to allocate tags and give a unique identifier per tag
struct TagAllocator
{
// Counter to uniquely identify allocated tags
@ -58,27 +58,32 @@ struct TagAllocator
labels: HashMap<usize, (&'static str, TagLabel)>,
}
// Label for a tag like : "symbol", "packet_start", "error"
/// Label for a tag like : "symbol", "packet_start", "error"
struct TagLabel
{
label: String,
// TODO: Allow user customization of labels
// maybe multiple labels
_label: String,
}
// Front for tag allocator
/// Object from which TagKeys are obtained. This guarantees absence of collisions between tags
pub struct Tags
{
allocator: Arc<RwLock<TagAllocator>>,
}
/// Used to anotate tags entries and retrieve them
#[derive(Clone)]
pub struct TagKey<T>
{
key: usize,
owner: Arc<RwLock<TagAllocator>>,
// Maybe used later to retrieve labels for example
_owner: Arc<RwLock<TagAllocator>>,
_phantom: PhantomData<T>,
}
// Tags a particular sample within a specific stream
/// Tags a particular sample within a specific stream
#[derive(Clone)]
pub(crate) struct TagSlot
{
@ -93,7 +98,7 @@ pub(crate) struct TagSlot
pub tag: Tag,
}
// Tag key value pairs
/// A Tag object containing TagKey-value pairs
#[derive(Clone)]
pub struct Tag
{
@ -102,6 +107,7 @@ pub struct Tag
impl Tags
{
/// Creates a new tag allocator
pub fn new() -> Self
{
Self {
@ -112,12 +118,13 @@ impl Tags
}
}
/// Allocates a new unique tag key
pub fn allocate_tag<T>(&mut self, label: impl AsRef<str>) -> TagKey<T>
{
let k = self.allocator.write().unwrap().allocate_tag::<T>(label);
TagKey {
key: k,
owner: self.allocator.clone(),
_owner: self.allocator.clone(),
_phantom: Default::default(),
}
}
@ -125,6 +132,7 @@ impl Tags
impl TagAllocator
{
/// Allocates a new unique tag key
pub fn allocate_tag<T>(&mut self, label: impl AsRef<str>) -> usize
{
let key = self.counter;
@ -133,7 +141,7 @@ impl TagAllocator
(
std::any::type_name::<T>(),
TagLabel {
label: label.as_ref().to_owned(),
_label: label.as_ref().to_owned(),
},
),
);
@ -144,6 +152,7 @@ impl TagAllocator
impl Default for Tags
{
/// Creates a new tag allocator
fn default() -> Self
{
Self::new()
@ -152,6 +161,7 @@ impl Default for Tags
impl Tag
{
/// Creates a new empty tag
pub fn new() -> Self
{
Self {
@ -159,6 +169,7 @@ impl Tag
}
}
/// Creates a new tag with a (key, value) entry
pub fn with_entry<T: 'static + Send + Sync>(key: TagKey<T>, value: T) -> Self
{
let new_tag = Self::default();
@ -166,7 +177,8 @@ impl Tag
new_tag
}
pub fn from_tags<const N: usize>(tag_opts: [&Tag; N]) -> Tag
/// Creates a new tag, which is the combination of the given tags
pub fn from_tags<const N: usize>(tag_opts: &[&Tag; N]) -> Tag
{
let new_tag = Self::default();
{
@ -181,31 +193,43 @@ impl Tag
new_tag
}
pub fn from_tag_opts<const N: usize>(tag_opts: [&Option<Tag>; N]) -> Option<Tag>
/// Creates a new tag option, which is the combination of the given tag options
///
/// If all the tag options are None, None is returned
/// Otherwise it is Some of the combination of all of the tags which are Some
pub fn from_tag_opts<'a>(mut tag_opts: impl Iterator<Item = &'a Option<Tag>>) -> Option<Tag>
{
if tag_opts.iter().all(|t| t.is_none())
{
return None;
}
let new_tag = Self::default();
let mut some_tags = 0;
{
let mut writer = new_tag.data.write().unwrap();
for tag in tag_opts.iter().filter(|t| t.is_some())
for tag in tag_opts.filter(|t| t.is_some())
{
some_tags += 1;
let reader = tag.as_ref().unwrap().data.read().unwrap();
writer.extend(reader.iter().map(|x| (*x.0, x.1.clone())));
}
}
Some(new_tag)
if some_tags > 0
{
Some(new_tag)
}
else
{
None
}
}
/// Adds a new entry in the tag. If it already exists, it is overwritten
pub fn add_entry<T: 'static + Send + Sync>(&self, key: TagKey<T>, value: T)
{
self.data.write().unwrap().insert(key.key, Arc::new(value));
}
/// Retrieves an entry in tag. If ther is no such entry corresponding to the key,
/// retruns None
pub fn retrieve<T: 'static + Send + Sync>(&self, key: &TagKey<T>) -> Option<Arc<T>>
{
let element = self.data.read().unwrap().get(&key.key).cloned();
@ -237,7 +261,7 @@ impl TagMergable<Tag> for Tag
{
fn merge(&self, other: &Self) -> Self
{
Self::from_tags([self, other])
Self::from_tags(&[self, other])
}
}
@ -245,7 +269,7 @@ impl TagMergable<Option<Tag>> for Option<Tag>
{
fn merge(&self, other: &Self) -> Self
{
Tag::from_tag_opts([self, other])
Tag::from_tag_opts([self, other].into_iter())
}
}