Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dad09cddcf | |||
| 87921968b4 | |||
| 81cac2f239 |
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -3076,6 +3076,14 @@ version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e"
|
||||
|
||||
[[package]]
|
||||
name = "simple"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"oxydsp-dsp",
|
||||
"oxydsp-flowgraph",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.12"
|
||||
|
||||
@ -8,7 +8,6 @@ use oxydsp_dsp::blocks::synthesis::Nco;
|
||||
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::FlatMap;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Map;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Scan;
|
||||
use oxydsp_dsp::blocks::utilities::channels::RxSource;
|
||||
use oxydsp_dsp::blocks::utilities::channels::TxSink;
|
||||
use oxydsp_dsp::filtering::fir::Fir;
|
||||
@ -17,20 +16,17 @@ use oxydsp_flowgraph::flowgraph;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use rand::random;
|
||||
use std::f32::consts::PI;
|
||||
use std::net::UdpSocket;
|
||||
use std::ops::BitXor;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::mpsc::Receiver;
|
||||
use std::sync::mpsc::SyncSender;
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::CARRIER;
|
||||
use crate::DEVIATION;
|
||||
use crate::SAMPLE_PER_SYMBOL;
|
||||
use crate::SAMPLE_RATE;
|
||||
use crate::gaussian;
|
||||
use crate::to_bits;
|
||||
|
||||
pub struct Transmitter
|
||||
|
||||
BIN
examples/qpsk-modem/mod.wav
Normal file
BIN
examples/qpsk-modem/mod.wav
Normal file
Binary file not shown.
@ -1,61 +1,407 @@
|
||||
use std::time::Instant;
|
||||
|
||||
use cpal::traits::DeviceTrait;
|
||||
use cpal::traits::HostTrait;
|
||||
use egui::Color32;
|
||||
use egui_plot::Line;
|
||||
use egui_plot::PlotPoints;
|
||||
use egui_plot::Points;
|
||||
use num::Complex;
|
||||
use num::complex::ComplexFloat;
|
||||
use oxydsp_dsp::blocks::filtering::fir::FirFilter;
|
||||
use oxydsp_dsp::blocks::filtering::pulse_shaping::PulseShaper;
|
||||
use oxydsp_dsp::blocks::iq::zero_if::ZeroIf;
|
||||
use oxydsp_dsp::blocks::math::basic::Multiplier;
|
||||
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::{Map, NullSink, Scan};
|
||||
use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Map;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Merger;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Scan;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::ScanTagged;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Splitter;
|
||||
use oxydsp_dsp::blocks::utilities::channels::RxSource;
|
||||
use oxydsp_dsp::blocks::utilities::channels::TxSink;
|
||||
use oxydsp_dsp::blocks::utilities::graph_control::GraphKiller;
|
||||
use oxydsp_dsp::blocks::utilities::iter::IterSource;
|
||||
use oxydsp_dsp::filtering::fir::Fir;
|
||||
use oxydsp_dsp::filtering::history_buf::HistoryBuf;
|
||||
use oxydsp_dsp::map;
|
||||
use oxydsp_dsp::units::DigitalFrequency;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::flowgraph;
|
||||
use rand::{RngExt, SeedableRng, random};
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::tag::Tagged;
|
||||
use oxydsp_flowgraph::tag::Tags;
|
||||
use rand::random;
|
||||
|
||||
const CARRRIER_FREQ: f64 = 1000.;
|
||||
const SAMPLE_RATE: usize = 48_000;
|
||||
const SAMPLE_PER_SYMBOL: usize = 100;
|
||||
|
||||
fn main()
|
||||
#[derive(BlockIO)]
|
||||
pub struct CostasLoop
|
||||
{
|
||||
let bits = (0..1024).map(|_| [random::<bool>(), random::<bool>()]);
|
||||
#[input]
|
||||
input: In<f32>,
|
||||
|
||||
let (iter_source, bits) = IterSource::new(bits.cycle());
|
||||
let (iq_map, iq) = Map::new(bits, |x| match x
|
||||
#[output]
|
||||
output: Out<Complex<f32>>,
|
||||
|
||||
center_freq: DigitalFrequency,
|
||||
nco: oxydsp_dsp::synthesis::oscillator::Nco<f32>,
|
||||
|
||||
loop_filter: oxydsp_dsp::filtering::fir::FirFilter<f32, f32, f32>,
|
||||
low_pass: oxydsp_dsp::filtering::fir::FirFilter<Complex<f32>, Complex<f32>, Complex<f32>>,
|
||||
}
|
||||
|
||||
impl CostasLoop
|
||||
{
|
||||
pub fn new(
|
||||
input: In<f32>,
|
||||
start_frequency: DigitalFrequency,
|
||||
loop_filter: Fir<f32>,
|
||||
) -> (Self, In<Complex<f32>>)
|
||||
{
|
||||
let (output, iq) = oxydsp_flowgraph::io::stream();
|
||||
(
|
||||
CostasLoop {
|
||||
input,
|
||||
output,
|
||||
center_freq: start_frequency,
|
||||
nco: start_frequency.into(),
|
||||
loop_filter: oxydsp_dsp::filtering::fir::FirFilter::new(loop_filter),
|
||||
low_pass: oxydsp_dsp::filtering::fir::FirFilter::new(
|
||||
Fir::lowpass(start_frequency, 100).normalized_len(),
|
||||
),
|
||||
},
|
||||
iq,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Block for CostasLoop
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output.push_iter(self.input.iter().map(|x| {
|
||||
let (signal, tag) = x.into();
|
||||
let lo = self.nco.next().unwrap();
|
||||
let iq = self
|
||||
.low_pass
|
||||
.next(Complex::new(lo.re * signal, lo.im * signal));
|
||||
|
||||
let error = iq.re * iq.im.signum() - iq.im * iq.re.signum();
|
||||
let correction = self.loop_filter.next(error);
|
||||
self.nco.set_frequency(DigitalFrequency::from_rad(
|
||||
self.center_freq.as_rad() + correction as f64,
|
||||
));
|
||||
|
||||
(iq, tag).into()
|
||||
}));
|
||||
oxydsp_flowgraph::block::BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
pub fn main()
|
||||
{
|
||||
demodulator();
|
||||
//modulator();
|
||||
}
|
||||
|
||||
pub fn modulator()
|
||||
{
|
||||
let bit_source = (0..8192).map(|_| [random::<bool>(), random::<bool>()]);
|
||||
|
||||
let mut tags = Tags::default();
|
||||
|
||||
let (mut iter_source, bits) = IterSource::new(bit_source);
|
||||
let kill_tag = tags.allocate_tag("Kill tag");
|
||||
iter_source.tag_last_with(kill_tag.clone());
|
||||
|
||||
let (to_iq, iq) = Map::new(bits, |x| match x
|
||||
{
|
||||
[true, true] => Complex::new(1., 1.),
|
||||
[true, false] => Complex::new(1., -1.),
|
||||
[false, true] => Complex::new(-1., 1.),
|
||||
[false, false] => Complex::new(-1., -1.),
|
||||
});
|
||||
let (pulse_shaper, iq) = PulseShaper::new(iq, Fir::square(200), 200);
|
||||
|
||||
let (lo, carrier) = OscillatorSource::new(DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into());
|
||||
// let (pulse_shaper, iq) = PulseShaper::new(
|
||||
// iq,
|
||||
// Fir::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 1., SAMPLE_PER_SYMBOL),
|
||||
// SAMPLE_PER_SYMBOL,
|
||||
// );
|
||||
let (pulse_shaper, iq) = PulseShaper::new(
|
||||
iq,
|
||||
Fir::gaussian(SAMPLE_PER_SYMBOL, 3.),
|
||||
SAMPLE_PER_SYMBOL,
|
||||
);
|
||||
let (carrier_oscillator, carrier) = OscillatorSource::new(
|
||||
DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into(),
|
||||
);
|
||||
let (mixer, passband) = Multiplier::new(iq, carrier);
|
||||
|
||||
let (channel, passband) = Scan::new(passband, rand::rngs::SmallRng::seed_from_u64(0), |state, x|
|
||||
let (tx, rx) = std::sync::mpsc::channel::<Complex<f32>>();
|
||||
let (graph_killer, passband) = GraphKiller::new(passband, kill_tag);
|
||||
let tx_sink = TxSink::new(passband, tx);
|
||||
|
||||
let graph = flowgraph![
|
||||
iter_source,
|
||||
to_iq,
|
||||
pulse_shaper,
|
||||
carrier_oscillator,
|
||||
mixer,
|
||||
graph_killer,
|
||||
tx_sink
|
||||
];
|
||||
graph.run(1);
|
||||
|
||||
let spec = hound::WavSpec {
|
||||
channels: 1,
|
||||
sample_rate: SAMPLE_RATE as u32,
|
||||
bits_per_sample: 16,
|
||||
sample_format: hound::SampleFormat::Int,
|
||||
};
|
||||
let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap();
|
||||
for iq in rx.iter()
|
||||
{
|
||||
x.re + state.sample::<f32, _>(rand_distr::StandardNormal)
|
||||
let amplitude = 0.5 * i16::MAX as f32;
|
||||
writer.write_sample((iq.re * amplitude) as i16).unwrap();
|
||||
}
|
||||
writer.finalize().unwrap();
|
||||
}
|
||||
|
||||
pub fn demodulator()
|
||||
{
|
||||
let (audio_tx, audio_rx) = std::sync::mpsc::channel();
|
||||
let (rx_source, signal) = RxSource::new(audio_rx);
|
||||
|
||||
let (downconverter, iq) = CostasLoop::new(
|
||||
signal,
|
||||
DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64),
|
||||
Fir::proportional_integral(100, 0.01, 0.00005),
|
||||
);
|
||||
|
||||
//let mut agc_filter = oxydsp_dsp::filtering::fir::FirFilter::new(Fir::proportional_integral(100, 0.1, 0.001));
|
||||
let (agc_error_tx, agc_error_rx) = std::sync::mpsc::channel();
|
||||
let (agc, iq) = Scan::new(iq, (1., 0.), move |(gain, error_low), iq| {
|
||||
let out = *gain * iq;
|
||||
|
||||
let error = 1. - out.abs();
|
||||
let alpha = 0.01;
|
||||
*error_low = (1. - alpha) * *error_low + alpha * error;
|
||||
let _ = agc_error_tx.send(*error_low);
|
||||
*gain += 0.01 * error; // Feedback
|
||||
|
||||
out
|
||||
});
|
||||
|
||||
let (zero_if, iq) = ZeroIf::new(passband, DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into());
|
||||
let (matched_filter, iq) = FirFilter::new(iq, Fir::<f32>::square(200));
|
||||
let (inspect, iq) = Scan::new(iq, (Instant::now(), 0), |(last, counter), x|
|
||||
{
|
||||
*counter += 1;
|
||||
if *counter >= 1_000_000
|
||||
{
|
||||
let time = Instant::now() - *last;
|
||||
println!("{:.2} Ms/s", 1. / time.as_secs_f32());
|
||||
*last = Instant::now();
|
||||
*counter = 0;
|
||||
}
|
||||
x
|
||||
});
|
||||
let null_sink = NullSink::new(iq);
|
||||
const DECIMATION: usize = 1;
|
||||
// let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating(
|
||||
// iq,
|
||||
// Fir::<Complex<f32>>::lowpass(DigitalFrequency::from_time_frequency(2000., SAMPLE_RATE as f64), 100)
|
||||
// .normalized_len().convoluted_with(&Fir::<Complex<f32>>::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 1., SAMPLE_PER_SYMBOL)
|
||||
// .normalized_sqr())
|
||||
// );
|
||||
// let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating(
|
||||
// iq,
|
||||
// Fir::<Complex<f32>>::lowpass(DigitalFrequency::from_time_frequency(2000., SAMPLE_RATE as f64), 100)
|
||||
// .normalized_len().convoluted_with(&Fir::<Complex<f32>>::gaussian(SAMPLE_PER_SYMBOL, 3.).normalized_sqr())
|
||||
// );
|
||||
let (matched_filter, iq) = FirFilter::<_, _, _, DECIMATION>::new_decimating(
|
||||
iq,
|
||||
Fir::<Complex<f32>>::gaussian(SAMPLE_PER_SYMBOL, 3.).normalized_sqr()
|
||||
);
|
||||
|
||||
let graph = flowgraph![iter_source, iq_map, pulse_shaper, lo, mixer, channel, zero_if, matched_filter, inspect, null_sink];
|
||||
graph.run(6).join();
|
||||
let (splitter, [iq_i, iq_q]) = Splitter::new(iq);
|
||||
let (proj_i, i) = Map::new(iq_i, |x| x.re);
|
||||
let (proj_q, q) = Map::new(iq_q, |x| x.im);
|
||||
|
||||
let mut tags = Tags::default();
|
||||
|
||||
let elg_filter = Fir::proportional_integral(100, 0.2, 0.002);
|
||||
let i_key = tags.allocate_tag("i tag");
|
||||
let q_key = tags.allocate_tag("q tag");
|
||||
let (elg_i, i) = EarlyLateGate::new(i, elg_filter.clone(), SAMPLE_PER_SYMBOL / DECIMATION, i_key.clone());
|
||||
let (elg_q, q) = EarlyLateGate::new(q, elg_filter.clone(), SAMPLE_PER_SYMBOL / DECIMATION, q_key.clone());
|
||||
|
||||
let (eye_i_tx, eye_i_rx) = std::sync::mpsc::channel::<Vec<f32>>();
|
||||
let (eye_q_tx, eye_q_rx) = std::sync::mpsc::channel::<Vec<f32>>();
|
||||
|
||||
let (merger, iq) = Merger::new([i, q]);
|
||||
|
||||
let (constellation_tx, constellation_rx) = std::sync::mpsc::channel();
|
||||
|
||||
let (debug, iq) = ScanTagged::new(
|
||||
iq,
|
||||
(
|
||||
HistoryBuf::new(0., (SAMPLE_PER_SYMBOL * 2) / DECIMATION),
|
||||
HistoryBuf::new(0., (SAMPLE_PER_SYMBOL * 2) / DECIMATION),
|
||||
),
|
||||
move |(buf_i, buf_q), input| {
|
||||
let ([re, im], tag) = input.into();
|
||||
buf_i.push(re);
|
||||
buf_q.push(im);
|
||||
|
||||
if tag.is_some_and(|t| t.retrieve(&i_key).is_some())
|
||||
{
|
||||
let _ = constellation_tx.send(Complex::new(re, im));
|
||||
let _ = eye_i_tx.send(buf_i.as_slice().iter().copied().collect::<Vec<_>>());
|
||||
let _ = eye_q_tx.send(buf_q.as_slice().iter().copied().collect::<Vec<_>>());
|
||||
}
|
||||
//(Complex::new(re, im), None).into()
|
||||
let k: Tagged<()> = ((), None).into();
|
||||
k
|
||||
},
|
||||
);
|
||||
let tx_sink = NullSink::new(iq);
|
||||
|
||||
let host = cpal::default_host();
|
||||
let device = host
|
||||
.default_input_device()
|
||||
.expect("no output device available");
|
||||
let mut supported_configs_range = device
|
||||
.supported_input_configs()
|
||||
.expect("error while querying configs");
|
||||
let supported_config = supported_configs_range
|
||||
.next()
|
||||
.expect("no supported config?!")
|
||||
.with_sample_rate(SAMPLE_RATE as u32);
|
||||
let _stream = device
|
||||
.build_input_stream(
|
||||
&supported_config.into(),
|
||||
move |data: &[f32], _: &cpal::InputCallbackInfo| {
|
||||
for x in data
|
||||
{
|
||||
let _ = audio_tx.send(*x * 100.);
|
||||
}
|
||||
},
|
||||
move |_err| {},
|
||||
None, // None=blocking, Some(Duration)=timeout
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let graph = flowgraph![
|
||||
rx_source,
|
||||
downconverter,
|
||||
agc,
|
||||
matched_filter,
|
||||
splitter,
|
||||
proj_i,
|
||||
proj_q,
|
||||
elg_i,
|
||||
elg_q,
|
||||
merger,
|
||||
debug,
|
||||
tx_sink
|
||||
];
|
||||
graph.run(6);
|
||||
|
||||
let mut eye_i_history = HistoryBuf::new(vec![], 200);
|
||||
let mut eye_q_history = HistoryBuf::new(vec![], 200);
|
||||
let mut constellation = HistoryBuf::new(Complex::new(0., 0.), 500);
|
||||
let mut agc_error = HistoryBuf::new(0., 50_000);
|
||||
eframe::run_simple_native("Window", Default::default(), move |ctx, _frame| {
|
||||
for eye in eye_i_rx.try_iter().take(200)
|
||||
{
|
||||
eye_i_history.push(eye);
|
||||
}
|
||||
for eye in eye_q_rx.try_iter().take(200)
|
||||
{
|
||||
eye_q_history.push(eye);
|
||||
}
|
||||
for point in constellation_rx.try_iter().take(500)
|
||||
{
|
||||
constellation.push(point);
|
||||
}
|
||||
for x in agc_error_rx.try_iter().take(5000)
|
||||
{
|
||||
agc_error.push(x);
|
||||
}
|
||||
|
||||
egui::CentralPanel::default().show(ctx, |ui| {
|
||||
egui_plot::Plot::new("plot")
|
||||
.data_aspect(1.)
|
||||
.show(ui, |plot_ui| {
|
||||
plot_ui.points(
|
||||
Points::new(
|
||||
"Constellation",
|
||||
constellation
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|point| [point.re as f64, point.im as f64])
|
||||
.collect::<PlotPoints>(),
|
||||
)
|
||||
.color(Color32::YELLOW.gamma_multiply_u8(70)),
|
||||
);
|
||||
|
||||
plot_ui.line(
|
||||
Line::new(
|
||||
"AGC Error",
|
||||
agc_error
|
||||
.as_slice()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, point)| [map(i as f64, 0., 50_000., -2., 2.), *point as f64 + 2.])
|
||||
.collect::<PlotPoints>(),
|
||||
)
|
||||
.color(Color32::YELLOW.gamma_multiply_u8(70)),
|
||||
);
|
||||
|
||||
for (eye_i, eye_q) in eye_i_history
|
||||
.as_slice()
|
||||
.iter()
|
||||
.zip(eye_q_history.as_slice().iter())
|
||||
{
|
||||
plot_ui.line(
|
||||
Line::new(
|
||||
"In-Phase",
|
||||
eye_i
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, x)| {
|
||||
[i as f64 / ((SAMPLE_PER_SYMBOL as f64 * 2.) / DECIMATION as f64) + 1., *x as f64]
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.color(Color32::RED),
|
||||
);
|
||||
|
||||
plot_ui.line(
|
||||
Line::new(
|
||||
"Quadrature",
|
||||
eye_q
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, x)| {
|
||||
[*x as f64, i as f64 / ((SAMPLE_PER_SYMBOL as f64 * 2.) / DECIMATION as f64) - 2.]
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.color(Color32::GREEN),
|
||||
);
|
||||
|
||||
// plot_ui.points(
|
||||
// Points::new(
|
||||
// "Constellation",
|
||||
// eye_i
|
||||
// .iter()
|
||||
// .zip(eye_q.iter())
|
||||
// .skip(SAMPLE_PER_SYMBOL / DECIMATION)
|
||||
// .step_by(SAMPLE_PER_SYMBOL / DECIMATION)
|
||||
// .map(|(i, q)| [*i as f64, *q as f64])
|
||||
// .collect::<PlotPoints>(),
|
||||
// )
|
||||
// .color(Color32::GREEN)
|
||||
// .radius(1.5),
|
||||
// );
|
||||
}
|
||||
});
|
||||
});
|
||||
ctx.request_repaint();
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn to_bits(n: u8) -> [bool; 8]
|
||||
|
||||
8
examples/simple/Cargo.toml
Normal file
8
examples/simple/Cargo.toml
Normal file
@ -0,0 +1,8 @@
|
||||
[package]
|
||||
name = "simple"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
oxydsp-flowgraph = {path = "../../oxydsp-flowgraph/"}
|
||||
oxydsp-dsp = {path = "../../oxydsp-dsp/"}
|
||||
31
examples/simple/src/main.rs
Normal file
31
examples/simple/src/main.rs
Normal file
@ -0,0 +1,31 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
|
||||
use oxydsp_dsp::blocks::utilities::adapters::Scan;
|
||||
use oxydsp_dsp::blocks::utilities::channels::RxSource;
|
||||
use oxydsp_flowgraph::flowgraph;
|
||||
|
||||
fn main()
|
||||
{
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let (rx_source, numbers) = RxSource::new(rx);
|
||||
let (inspect, numbers) = Scan::new(numbers, 0, |state, x: usize| {
|
||||
if x.is_multiple_of(100)
|
||||
{
|
||||
println!("{}", x);
|
||||
}
|
||||
x
|
||||
});
|
||||
let null_sink = NullSink::new(numbers);
|
||||
let graph = flowgraph![rx_source, inspect, null_sink];
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut x = 0usize;
|
||||
loop
|
||||
{
|
||||
let _ = tx.send(x);
|
||||
x += 1;
|
||||
}
|
||||
});
|
||||
graph.run(1).join();
|
||||
}
|
||||
@ -2,11 +2,9 @@ use num::Zero;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::Tag;
|
||||
use std::iter::Sum;
|
||||
use std::ops::Add;
|
||||
use std::ops::Mul;
|
||||
@ -14,7 +12,7 @@ use std::ops::Mul;
|
||||
use crate::filtering::fir::Fir;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct FirFilter<F, T, O>
|
||||
pub struct FirFilter<F, T, O, const D: usize = 1>
|
||||
where
|
||||
T: Clone + Zero + 'static,
|
||||
F: Mul<T, Output = O> + Clone + 'static,
|
||||
@ -29,7 +27,7 @@ where
|
||||
filter: crate::filtering::fir::FirFilter<F, T, O>,
|
||||
}
|
||||
|
||||
impl<F, T, O> FirFilter<F, T, O>
|
||||
impl<F, T, O> FirFilter<F, T, O, 1>
|
||||
where
|
||||
T: Clone + Zero + 'static,
|
||||
F: Mul<T, Output = O> + Clone + 'static,
|
||||
@ -37,6 +35,19 @@ where
|
||||
{
|
||||
pub fn new(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
|
||||
{
|
||||
Self::new_decimating(input, impulse_response)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, T, O, const D: usize> FirFilter<F, T, O, D>
|
||||
where
|
||||
T: Clone + Zero + 'static,
|
||||
F: Mul<T, Output = O> + Clone + 'static,
|
||||
O: Add<O, Output = O> + Sum + Clone + Zero,
|
||||
{
|
||||
pub fn new_decimating(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
|
||||
{
|
||||
const { assert!(D != 0); };
|
||||
let (output, filtered) = oxydsp_flowgraph::io::stream();
|
||||
(
|
||||
Self {
|
||||
@ -49,15 +60,35 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, T, O> Block for FirFilter<F, T, O>
|
||||
impl<F, T, O, const D: usize> Block for FirFilter<F, T, O, D>
|
||||
where
|
||||
T: Clone + Zero,
|
||||
F: Mul<T, Output = O> + Clone + 'static,
|
||||
O: Add<O, Output = O> + Sum + Clone + Zero,
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output.push_iter(self.input.pop_iter().map(|x| (self.filter.next(x.0), x.1).into()));
|
||||
if D > 1
|
||||
{
|
||||
let mut input_iter = self.input.iter();
|
||||
let mut output_pusher = self.output.write_push();
|
||||
while input_iter.len() >= D && output_pusher.len() > 0
|
||||
{
|
||||
// TODO: Maybe find a better way to do this.
|
||||
// I hope this is at least well optimized by the compilator
|
||||
let batch: [_; D] = std::array::from_fn(|_| input_iter.next().unwrap());
|
||||
let tag_batch: [_; D] = std::array::from_fn(|i| &batch[i].1);
|
||||
let data_batch: [_; D] = std::array::from_fn(|i| batch[i].0.clone());
|
||||
self.filter.insert_batch_ref(&data_batch);
|
||||
let _ = output_pusher
|
||||
.push((self.filter.filtered(), Tag::from_tag_opts(tag_batch.into_iter())).into());
|
||||
}
|
||||
}
|
||||
else if D == 1
|
||||
{
|
||||
self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into()));
|
||||
}
|
||||
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
@ -47,14 +47,14 @@ impl<T: 'static + std::ops::Mul<Output = T> + std::iter::Sum + std::clone::Clone
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let reader = self.input.read();
|
||||
let writer = self.output.write();
|
||||
let mut reader = self.input.iter();
|
||||
let mut writer = self.output.write_push();
|
||||
|
||||
for _ in 0..writer.len()
|
||||
{
|
||||
if self.remaining == 0
|
||||
{
|
||||
if let Some(input) = reader.pop()
|
||||
if let Some(input) = reader.next()
|
||||
{
|
||||
let (data, tag) = input.into();
|
||||
let _ = writer.push((self.pulse_shaper.next(data), tag).into());
|
||||
|
||||
@ -1,12 +1,11 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use num::Complex;
|
||||
use num::Float;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use rustfft::FftNum;
|
||||
|
||||
use crate::filtering::fir::Fir;
|
||||
@ -62,7 +61,7 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output.push_iter(self.input.pop_iter().map(|input| {
|
||||
self.output.push_iter(self.input.iter().map(|input| {
|
||||
let (data, tag) = input.into();
|
||||
// Mix
|
||||
let lo_sample = self.local_oscillator.next().unwrap();
|
||||
|
||||
@ -4,15 +4,11 @@ use std::ops::Mul;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::TagMergable;
|
||||
use oxydsp_flowgraph::tag::Tag;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block]
|
||||
pub struct Adder<Ia, Ib, O>
|
||||
where
|
||||
Ia: Add<Ib, Output = O> + 'static,
|
||||
@ -49,35 +45,26 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, Ia, Ib, O> SyncBlock<'view> for Adder<Ia, Ib, O>
|
||||
impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
|
||||
where
|
||||
Ia: Add<Ib, Output = O> + 'static,
|
||||
Ib: 'static,
|
||||
O: 'static,
|
||||
{
|
||||
fn sync_work(_state: Self::StateView, input: Self::Input) -> Option<Self::Output>
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
Some(input.0 + input.1)
|
||||
self.output.push_iter(
|
||||
self.input_a
|
||||
.iter()
|
||||
.zip(self.input_b.iter())
|
||||
.map(|(x, y)| {
|
||||
(x.0 + y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into()
|
||||
})
|
||||
);
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
// impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
|
||||
// where
|
||||
// Ia: Add<Ib, Output = O> + 'static,
|
||||
// Ib: 'static,
|
||||
// O: 'static,
|
||||
// {
|
||||
// fn work(&mut self) -> BlockResult
|
||||
// {
|
||||
// self.output.push_iter(
|
||||
// (&mut self.input_a, &mut self.input_b)
|
||||
// .pop_iter()
|
||||
// .map(|(a, b)| (a.0 + b.0, a.1.merge(&b.1))),
|
||||
// );
|
||||
// BlockResult::Ok
|
||||
// }
|
||||
// }
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Multiplier<Ia, Ib, O>
|
||||
where
|
||||
@ -124,9 +111,12 @@ where
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
self.output.push_iter(
|
||||
(&mut self.input_a, &mut self.input_b)
|
||||
.pop_iter()
|
||||
.map(|(a, b)| (a.0 * b.0, a.1.merge(&b.1)).into()),
|
||||
self.input_a
|
||||
.iter()
|
||||
.zip(self.input_b.iter())
|
||||
.map(|(x, y)| {
|
||||
(x.0 * y.0, Tag::from_tag_opts([&x.1, &y.1].into_iter())).into()
|
||||
})
|
||||
);
|
||||
BlockResult::Ok
|
||||
}
|
||||
|
||||
@ -6,7 +6,6 @@ use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
use oxydsp_flowgraph::io::stream;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
@ -81,7 +80,7 @@ impl<T: Float + From<f32> + 'static> Block for Nco<T>
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output
|
||||
.push_iter(&mut self.frequency.pop_iter().map(|f| {
|
||||
.push_iter(&mut self.frequency.iter().map(|f| {
|
||||
self.nco.set_frequency(f.0);
|
||||
(self.nco.next().unwrap(), f.1).into()
|
||||
}));
|
||||
|
||||
@ -1,21 +1,19 @@
|
||||
use std::collections::VecDeque;
|
||||
use std::iter::Sum;
|
||||
|
||||
use num::Float;
|
||||
use num::NumCast;
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::Tag;
|
||||
use oxydsp_flowgraph::tag::TagKey;
|
||||
|
||||
use crate::filtering::fir::Fir;
|
||||
use crate::filtering::fir::FirFilter;
|
||||
use crate::filtering::history_buf::HistoryBuf;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block(tagged)]
|
||||
pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'static>
|
||||
{
|
||||
#[input]
|
||||
@ -27,7 +25,7 @@ pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'stati
|
||||
symbol_length: usize,
|
||||
|
||||
// Window looking at symbol_length samples at a time
|
||||
window: VecDeque<T>,
|
||||
window: HistoryBuf<T>,
|
||||
|
||||
// The current location of the window, in relation to the last sample
|
||||
window_location: usize,
|
||||
@ -44,7 +42,7 @@ pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'stati
|
||||
|
||||
impl<T> EarlyLateGate<T>
|
||||
where
|
||||
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
|
||||
T: Float + Sum + Clone + 'static + Send + Sync + NumCast + Default,
|
||||
{
|
||||
pub fn new(
|
||||
input: In<T>,
|
||||
@ -58,7 +56,7 @@ where
|
||||
Self {
|
||||
input,
|
||||
output,
|
||||
window: VecDeque::with_capacity(symbol_length),
|
||||
window: HistoryBuf::new(Default::default(), symbol_length),
|
||||
symbol_length,
|
||||
window_location: 0,
|
||||
window_center: symbol_length / 2,
|
||||
@ -72,48 +70,44 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, T> SyncBlock<'view> for EarlyLateGate<T>
|
||||
impl<'view, T> Block for EarlyLateGate<T>
|
||||
where
|
||||
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
|
||||
{
|
||||
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
if state.window.len() < *state.symbol_length
|
||||
{
|
||||
state.window.push_back(input.0);
|
||||
*state.window_location += 1;
|
||||
return Some(input.0.into());
|
||||
}
|
||||
self.output.push_iter(self.input.iter().map(|input| {
|
||||
// Bring new sample in
|
||||
self.window.push(input.0);
|
||||
self.window_location += 1;
|
||||
|
||||
// Bring new sample in
|
||||
state.window.pop_front();
|
||||
state.window.push_back(input.0);
|
||||
*state.window_location += 1;
|
||||
let sample = self.window.as_slice()[self.window_center];
|
||||
let mut tag = None;
|
||||
if self.window_location >= self.next_sample as usize
|
||||
{
|
||||
let early_index = self.window_center - (0.25 * self.symbol_length as f32) as usize;
|
||||
let late_index = self.window_center + (0.25 * self.symbol_length as f32) as usize;
|
||||
|
||||
let sample = state.window[*state.window_center];
|
||||
let mut tag = None;
|
||||
if *state.window_location >= *state.next_sample as usize
|
||||
{
|
||||
let early_index = *state.window_center - (0.25 * *state.symbol_length as f32) as usize;
|
||||
let late_index = *state.window_center + (0.25 * *state.symbol_length as f32) as usize;
|
||||
let early_sample = self.window.as_slice()[early_index];
|
||||
let late_sample = self.window.as_slice()[late_index];
|
||||
|
||||
let early_sample = state.window[early_index];
|
||||
let late_sample = state.window[late_index];
|
||||
let error = (late_sample - early_sample) * sample;
|
||||
let correction = self.loop_filter.next(error);
|
||||
|
||||
let error = (late_sample - early_sample) * sample;
|
||||
let correction = state.loop_filter.next(error);
|
||||
// Figure out next sample location
|
||||
self.next_sample +=
|
||||
(self.symbol_length as f32 + correction.to_f32().unwrap()).max(0.);
|
||||
|
||||
// Figure out next sample location
|
||||
*state.next_sample +=
|
||||
(*state.symbol_length as f32 + correction.to_f32().unwrap()).max(0.);
|
||||
// Turn everything back relative to current sample
|
||||
self.next_sample -= self.window_location as f32;
|
||||
self.window_location = 0;
|
||||
|
||||
// Turn everything back relative to current sample
|
||||
*state.next_sample -= *state.window_location as f32;
|
||||
*state.window_location = 0;
|
||||
tag = Some(Tag::with_entry(self.symbol_tag_key.clone(), error));
|
||||
}
|
||||
|
||||
tag = Some(Tag::with_entry(state.symbol_tag_key.clone(), error));
|
||||
}
|
||||
(sample, tag).into()
|
||||
}));
|
||||
|
||||
Some((sample, tag).into())
|
||||
oxydsp_flowgraph::block::BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,3 +2,4 @@ pub mod adapters;
|
||||
pub mod channels;
|
||||
pub mod iter;
|
||||
pub mod squelch;
|
||||
pub mod graph_control;
|
||||
|
||||
@ -1,14 +1,12 @@
|
||||
use std::iter::FusedIterator;
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
use oxydsp_flowgraph::BlockIO;
|
||||
use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::block::SyncBlock;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
use oxydsp_flowgraph::io::stream;
|
||||
use oxydsp_flowgraph::sync_block;
|
||||
use oxydsp_flowgraph::tag::Tag;
|
||||
use oxydsp_flowgraph::tag::TagMergable;
|
||||
use oxydsp_flowgraph::tag::Tagged;
|
||||
@ -27,7 +25,7 @@ pub struct Map<I: 'static, O: 'static, F>
|
||||
|
||||
impl<I: 'static, O: 'static, F> Map<I, O, F>
|
||||
where
|
||||
F: Fn(I) -> O,
|
||||
F: FnMut(I) -> O,
|
||||
{
|
||||
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
|
||||
{
|
||||
@ -38,15 +36,12 @@ where
|
||||
|
||||
impl<I: 'static, O: 'static, F> Block for Map<I, O, F>
|
||||
where
|
||||
F: Fn(I) -> O,
|
||||
F: FnMut(I) -> O,
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
self.output.push_iter(
|
||||
self.input
|
||||
.pop_iter()
|
||||
.map(|x| ((&self.map)(x.0), x.1).into()),
|
||||
);
|
||||
self.output
|
||||
.push_iter(self.input.iter().map(|x| ((&mut self.map)(x.0), x.1).into()));
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
@ -80,12 +75,12 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let reader = self.input.read();
|
||||
let mut writer = self.output.write_push();
|
||||
let mut reader = self.input.iter();
|
||||
|
||||
for _ in 0..(writer.len().min(reader.len()))
|
||||
{
|
||||
let (input, tag_opt) = reader.pop().unwrap().into();
|
||||
let (input, tag_opt) = reader.next().unwrap().into();
|
||||
let (output, result) = (self.map)(input);
|
||||
let _ = writer.push((output, tag_opt).into());
|
||||
match result
|
||||
@ -131,12 +126,12 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let reader = self.input.read();
|
||||
let mut writer = self.output.write_push();
|
||||
let mut reader = self.input.iter();
|
||||
|
||||
for _ in 0..(writer.len().min(reader.len()))
|
||||
{
|
||||
let (input, tag_opt) = reader.pop().unwrap().into();
|
||||
let (input, tag_opt) = reader.next().unwrap().into();
|
||||
let (tagged_out, result) = (self.map)((input, tag_opt.clone()).into());
|
||||
let (output, tag_out) = tagged_out.into();
|
||||
|
||||
@ -158,7 +153,7 @@ where
|
||||
#[derive(BlockIO)]
|
||||
pub struct Scan<I: 'static, O: 'static, S, F>
|
||||
where
|
||||
F: Fn(&mut S, I) -> O,
|
||||
F: FnMut(&mut S, I) -> O,
|
||||
{
|
||||
#[input]
|
||||
input: In<I>,
|
||||
@ -173,7 +168,7 @@ where
|
||||
|
||||
impl<I: 'static, O: 'static, S, F> Scan<I, O, S, F>
|
||||
where
|
||||
F: Fn(&mut S, I) -> O,
|
||||
F: FnMut(&mut S, I) -> O,
|
||||
{
|
||||
pub fn new(input: In<I>, initial_state: S, map: F) -> (Self, In<O>)
|
||||
{
|
||||
@ -194,17 +189,20 @@ impl<I, O, S, F> Block for Scan<I, O, S, F>
|
||||
where
|
||||
I: 'static,
|
||||
O: 'static,
|
||||
F: Fn(&mut S, I) -> O,
|
||||
F: FnMut(&mut S, I) -> O,
|
||||
{
|
||||
fn work(&mut self) -> BlockResult {
|
||||
self.output.push_iter(self.input.pop_iter()
|
||||
.map(|x| ((self.map)(&mut self.state, x.0), x.1).into()));
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
self.output.push_iter(
|
||||
self.input
|
||||
.iter()
|
||||
.map(|x| ((self.map)(&mut self.state, x.0), x.1).into()),
|
||||
);
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block(tagged)]
|
||||
pub struct ScanTagged<I: 'static, O: 'static, S, F>
|
||||
where
|
||||
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
|
||||
@ -239,16 +237,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, I, O, S, F> SyncBlock<'view> for ScanTagged<I, O, S, F>
|
||||
impl<I, O, S, F> Block for ScanTagged<I, O, S, F>
|
||||
where
|
||||
I: 'static,
|
||||
O: 'static,
|
||||
S: 'view,
|
||||
F: Fn(&mut S, Tagged<I>) -> Tagged<O> + 'view,
|
||||
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
|
||||
{
|
||||
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
Some((*state.map)(state.state, input))
|
||||
self.output.push_iter(self.input.iter().map(|tagged| {
|
||||
let cloned_tag = tagged.1.clone();
|
||||
let (output, tag) = (self.map)(&mut self.state, (tagged.0, cloned_tag).into()).into();
|
||||
(output, Tag::from_tag_opts([&tagged.1, &tag].into_iter())).into()
|
||||
}));
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
@ -288,15 +290,15 @@ impl<T: Clone + 'static> Block for Repeat<T>
|
||||
{
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let reader = self.input.read();
|
||||
let mut writer = self.output.write_push();
|
||||
let mut reader = self.input.iter();
|
||||
let len = writer.len().min(reader.len() / self.repetitions + 1);
|
||||
|
||||
for _ in 0..len
|
||||
{
|
||||
if self.remaining == 0 || self.current.is_none()
|
||||
{
|
||||
if let Some(x) = reader.pop()
|
||||
if let Some(x) = reader.next()
|
||||
{
|
||||
self.current = Some(x.into());
|
||||
self.remaining = self.repetitions;
|
||||
@ -324,7 +326,6 @@ impl<T: Clone + 'static> Block for Repeat<T>
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
#[sync_block]
|
||||
pub struct NullSink<T: 'static>
|
||||
{
|
||||
#[input]
|
||||
@ -339,12 +340,12 @@ impl<T: 'static> NullSink<T>
|
||||
}
|
||||
}
|
||||
|
||||
impl<'view, I: 'static> SyncBlock<'view> for NullSink<I>
|
||||
impl<I: 'static> Block for NullSink<I>
|
||||
{
|
||||
fn sync_work(_: Self::StateView, _: Self::Input) -> Option<Self::Output>
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
// Don't do shit !
|
||||
Some(())
|
||||
self.input.iter().for_each(|_| ());
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
@ -383,12 +384,9 @@ impl<T: 'static + Clone> Block for Tee<T>
|
||||
{
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
let writer_a = self.output_a.write();
|
||||
let writer_b = self.output_b.write();
|
||||
for x in self
|
||||
.input
|
||||
.pop_iter()
|
||||
.take(writer_a.len().min(writer_b.len()))
|
||||
let mut writer_a = self.output_a.write_push();
|
||||
let mut writer_b = self.output_b.write_push();
|
||||
for x in self.input.iter().take(writer_a.len().min(writer_b.len()))
|
||||
{
|
||||
let _ = writer_a.push(x.clone());
|
||||
let _ = writer_b.push(x);
|
||||
@ -446,8 +444,8 @@ where
|
||||
{
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let reader = self.input.read();
|
||||
let mut writer = self.output.write_push();
|
||||
let mut reader = self.input.iter();
|
||||
|
||||
let max_write = writer.len();
|
||||
let mut written = 0;
|
||||
@ -472,7 +470,7 @@ where
|
||||
if self.current_iter.is_none()
|
||||
{
|
||||
// Get input
|
||||
if let Some(input) = reader.pop()
|
||||
if let Some(input) = reader.next()
|
||||
{
|
||||
let mut new_iter = (self.map)(input.0).into_iter();
|
||||
if let Some(first_elt) = new_iter.next()
|
||||
@ -499,3 +497,122 @@ where
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Splitter<T: 'static + Clone, const N: usize>
|
||||
{
|
||||
#[input]
|
||||
input: In<T>,
|
||||
|
||||
#[output]
|
||||
outputs: [Out<T>; N],
|
||||
}
|
||||
|
||||
impl<T: 'static + Clone, const N: usize> Splitter<T, N>
|
||||
{
|
||||
pub fn new(input: In<T>) -> (Self, [In<T>; N])
|
||||
{
|
||||
const { assert!(N > 0) }
|
||||
let (outs, ins) = oxydsp_flowgraph::io::streams();
|
||||
(
|
||||
Self {
|
||||
input,
|
||||
outputs: outs,
|
||||
},
|
||||
ins,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static + Clone, const N: usize> Block for Splitter<T, N>
|
||||
{
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
let mut input_iter = self.input.iter();
|
||||
let mut outputs = self
|
||||
.outputs
|
||||
.iter_mut()
|
||||
.map(|x| x.write_push())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let length = input_iter
|
||||
.len()
|
||||
.min(outputs.iter().map(|x| x.len()).min().unwrap());
|
||||
|
||||
for _ in 0..length
|
||||
{
|
||||
let pulled = input_iter.next().unwrap();
|
||||
outputs.iter_mut().for_each(|x| {
|
||||
let _ = x.push(pulled.clone());
|
||||
});
|
||||
}
|
||||
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Merger<T: 'static, const N: usize>
|
||||
{
|
||||
#[input]
|
||||
inputs: [In<T>; N],
|
||||
|
||||
#[output]
|
||||
output: Out<[T; N]>,
|
||||
}
|
||||
|
||||
impl<T: 'static, const N: usize> Merger<T, N>
|
||||
{
|
||||
pub fn new(inputs: [In<T>; N]) -> (Self, In<[T; N]>)
|
||||
{
|
||||
const { assert!(N > 0) }
|
||||
let (out, inp) = oxydsp_flowgraph::io::stream();
|
||||
|
||||
(
|
||||
Self {
|
||||
inputs,
|
||||
output: out,
|
||||
},
|
||||
inp,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Sized + 'static, const N: usize> Block for Merger<T, N>
|
||||
{
|
||||
fn work(&mut self) -> BlockResult
|
||||
{
|
||||
let mut inputs = self.inputs.iter_mut().map(|x| x.iter()).collect::<Vec<_>>();
|
||||
let mut output = self.output.write_push();
|
||||
|
||||
let len = inputs
|
||||
.len()
|
||||
.min(inputs.iter().map(|x| x.len()).min().unwrap());
|
||||
|
||||
let mut datas: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit());
|
||||
let mut tags: [_; N] = std::array::from_fn(|_| MaybeUninit::uninit());
|
||||
for _ in 0..len
|
||||
{
|
||||
for (i, (data, tag)) in inputs
|
||||
.iter_mut()
|
||||
.map(|x| x.next().unwrap().into())
|
||||
.enumerate()
|
||||
{
|
||||
datas[i] = MaybeUninit::new(data);
|
||||
tags[i] = MaybeUninit::new(tag);
|
||||
}
|
||||
|
||||
let ok_datas: [_; N] = unsafe {
|
||||
std::array::from_fn(|i| std::mem::replace(&mut datas[i], MaybeUninit::uninit()).assume_init())
|
||||
};
|
||||
let ok_tags = unsafe {
|
||||
std::mem::transmute::<&[MaybeUninit<Option<Tag>>; N], &[Option<Tag>; N]>(&tags)
|
||||
};
|
||||
|
||||
let tag = Tag::from_tag_opts(ok_tags.into_iter());
|
||||
output.push((ok_datas, tag).into());
|
||||
}
|
||||
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use std::fmt::Debug;
|
||||
use std::sync::mpsc::Receiver;
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::sync::mpsc::SyncSender;
|
||||
@ -7,7 +8,6 @@ use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
use oxydsp_flowgraph::io::stream;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
@ -45,20 +45,13 @@ impl<Tx, I: 'static> TxSink<Tx, I>
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: 'static> Block for RxSource<Receiver<I>, I>
|
||||
impl<I: 'static + Debug> Block for RxSource<Receiver<I>, I>
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
if self
|
||||
.output
|
||||
.push_iter(self.input.try_iter().map(|x| (x, None).into()))
|
||||
{
|
||||
BlockResult::Ok
|
||||
}
|
||||
else
|
||||
{
|
||||
BlockResult::Terminated
|
||||
}
|
||||
self.output
|
||||
.push_iter(self.input.try_iter().map(|x| (x, None).into()));
|
||||
BlockResult::Ok
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,7 +61,7 @@ impl<I: 'static> Block for TxSink<Sender<I>, I>
|
||||
{
|
||||
if self
|
||||
.input
|
||||
.pop_iter()
|
||||
.iter()
|
||||
.map(|x| self.output.send(x.0))
|
||||
.any(|res| res.is_err())
|
||||
{
|
||||
@ -87,7 +80,7 @@ impl<I: 'static> Block for TxSink<SyncSender<I>, I>
|
||||
{
|
||||
if self
|
||||
.input
|
||||
.pop_iter()
|
||||
.iter()
|
||||
.map(|x| self.output.send(x.0))
|
||||
.any(|res| res.is_err())
|
||||
{
|
||||
|
||||
53
oxydsp-dsp/src/blocks/utilities/graph_control.rs
Normal file
53
oxydsp-dsp/src/blocks/utilities/graph_control.rs
Normal file
@ -0,0 +1,53 @@
|
||||
use oxydsp_flowgraph::{BlockIO, block::Block, io::{In, Out}, tag::TagKey};
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct GraphKiller<T: 'static, K: Send>
|
||||
{
|
||||
#[input]
|
||||
input: In<T>,
|
||||
|
||||
#[output]
|
||||
output: Out<T>,
|
||||
|
||||
kill_tag: TagKey<K>
|
||||
}
|
||||
|
||||
impl<T: 'static, K: Send + Send + 'static> GraphKiller<T, K>
|
||||
{
|
||||
pub fn new(input: In<T>, kill_on: TagKey<K>) -> (Self, In<T>)
|
||||
{
|
||||
let (output, port) = oxydsp_flowgraph::io::stream();
|
||||
(
|
||||
GraphKiller
|
||||
{
|
||||
input,
|
||||
output,
|
||||
kill_tag: kill_on,
|
||||
},
|
||||
port
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static, K: Send + Sync + 'static> Block for GraphKiller<T, K>
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let mut reader = self.input.iter();
|
||||
let mut writer = self.output.write_push();
|
||||
for _ in 0..reader.len().min(writer.len())
|
||||
{
|
||||
let (data, tag) = reader.next().unwrap().into();
|
||||
|
||||
if tag.as_ref().is_some_and(|t| t.retrieve(&self.kill_tag).is_some())
|
||||
{
|
||||
return oxydsp_flowgraph::block::BlockResult::Exit;
|
||||
}
|
||||
|
||||
let _ = writer.push((data, tag).into());
|
||||
}
|
||||
|
||||
|
||||
oxydsp_flowgraph::block::BlockResult::Ok
|
||||
}
|
||||
}
|
||||
@ -52,7 +52,7 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
let mut writer = self.output.write_push();
|
||||
|
||||
for _ in 0..writer.len()
|
||||
{
|
||||
|
||||
@ -11,7 +11,6 @@ use oxydsp_flowgraph::block::Block;
|
||||
use oxydsp_flowgraph::block::BlockResult;
|
||||
use oxydsp_flowgraph::io::In;
|
||||
use oxydsp_flowgraph::io::Out;
|
||||
use oxydsp_flowgraph::io::PopIterable;
|
||||
|
||||
#[derive(BlockIO)]
|
||||
pub struct Squelch<T>
|
||||
@ -61,8 +60,8 @@ where
|
||||
{
|
||||
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
|
||||
{
|
||||
let writer = self.output.write();
|
||||
for x in self.input.pop_iter().take(writer.len())
|
||||
let mut writer = self.output.write_push();
|
||||
for x in self.input.iter().take(writer.len())
|
||||
{
|
||||
let (element, tag) = x.into();
|
||||
|
||||
|
||||
@ -1,13 +1,12 @@
|
||||
use num::Complex;
|
||||
use num::Float;
|
||||
use num::FromPrimitive;
|
||||
use num::Num;
|
||||
use num::One;
|
||||
use num::Zero;
|
||||
use num::complex::ComplexFloat;
|
||||
use rustfft::FftNum;
|
||||
use rustfft::FftPlanner;
|
||||
use std::array;
|
||||
use std::collections::VecDeque;
|
||||
use std::f64::consts::PI;
|
||||
use std::iter::Sum;
|
||||
use std::ops::Add;
|
||||
@ -18,7 +17,6 @@ use crate::filtering::history_buf::HistoryBuf;
|
||||
use crate::map;
|
||||
use crate::units::DigitalFrequency;
|
||||
|
||||
|
||||
/// Represents a finite impulse response as a vector
|
||||
/// of values in time.
|
||||
///
|
||||
@ -28,8 +26,33 @@ use crate::units::DigitalFrequency;
|
||||
///
|
||||
/// For a reverb ir for example the clap would be at index 0
|
||||
/// and the reverb tail towards the end of the vector.
|
||||
#[derive(Clone)]
|
||||
pub struct Fir<T>(pub Vec<T>);
|
||||
|
||||
impl<T> Fir<T>
|
||||
where
|
||||
T: Num + Clone + Sum,
|
||||
{
|
||||
pub fn convoluted_with(&self, other: &Fir<T>) -> Fir<T>
|
||||
{
|
||||
// Perform convolution
|
||||
let mut new_fir = vec![];
|
||||
let mut filter = FirFilter::<T, T, T>::new(self.clone());
|
||||
|
||||
for x in self.0.iter().rev()
|
||||
{
|
||||
new_fir.push(filter.next(x.clone()));
|
||||
}
|
||||
|
||||
for _ in 0..other.0.len()
|
||||
{
|
||||
new_fir.push(filter.next(T::zero()));
|
||||
}
|
||||
|
||||
Fir(new_fir)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Fir<Complex<T>>
|
||||
where
|
||||
T: FftNum + Float + Clone,
|
||||
@ -207,6 +230,32 @@ where
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a gaussion fir of length `length`
|
||||
/// The maximum amplitude of the fir is 1.
|
||||
/// The gaussian curve is centered in the fir.
|
||||
/// The parameter `standard_deviations` specifies how many multiples of the
|
||||
/// standard deviations is on the left and right of the center before the fir cuts it off.
|
||||
pub fn gaussian(length: usize, standard_deviations: f32) -> Self
|
||||
{
|
||||
Self(
|
||||
(0..length)
|
||||
.map(|x| {
|
||||
let t = map(
|
||||
x as f64,
|
||||
0.,
|
||||
length as f64,
|
||||
-standard_deviations as f64,
|
||||
standard_deviations as f64,
|
||||
);
|
||||
|
||||
// Gaussian with sd=1
|
||||
let sq = t / 2.;
|
||||
T::from_f64(-sq * sq).unwrap().exp()
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A simple convolutional finite impulse response filter
|
||||
@ -217,7 +266,7 @@ where
|
||||
{
|
||||
fir: Vec<F>,
|
||||
//taps: VecDeque<T>,
|
||||
taps: HistoryBuf<T>
|
||||
taps: HistoryBuf<T>,
|
||||
}
|
||||
|
||||
impl<F, T, O> FirFilter<F, T, O>
|
||||
@ -236,14 +285,34 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the next output given an input sample.
|
||||
/// Inserts a new sample in the filter and get its new value
|
||||
///
|
||||
/// At the beginning, the delay line starts with zeroes.
|
||||
pub fn next(&mut self, input: T) -> O
|
||||
{
|
||||
self.insert(input);
|
||||
self.filtered()
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, input: T)
|
||||
{
|
||||
self.taps.push(input);
|
||||
}
|
||||
|
||||
pub fn insert_batch_ref(&mut self, input: &[T])
|
||||
{
|
||||
for x in input.iter().cloned()
|
||||
{
|
||||
self.taps.push(x);
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the current value of the filter
|
||||
/// given the sampels that it currently holds
|
||||
pub fn filtered(&self) -> O
|
||||
{
|
||||
let taps = self.taps.as_slice();
|
||||
Self::dot_prod(&self.fir, taps)
|
||||
Self::dot_prod(&self.fir, taps)
|
||||
}
|
||||
|
||||
pub fn dot_prod(a: &[F], b: &[T]) -> O
|
||||
@ -256,12 +325,11 @@ where
|
||||
let (b_chunks, b_remainder) = b.as_chunks::<4>();
|
||||
|
||||
for (x, y) in a_chunks.iter().zip(b_chunks.iter())
|
||||
|
||||
{
|
||||
sum[0] = sum[0].clone() + x[0].clone() * y[0].clone();
|
||||
sum[1] = sum[1].clone() + x[1].clone() * y[1].clone();
|
||||
sum[2] = sum[2].clone() + x[2].clone() * y[2].clone();
|
||||
sum[3] = sum[3].clone() + x[3].clone() * y[3].clone();
|
||||
sum[0] = sum[0].clone() + x[0].clone() * y[0].clone();
|
||||
sum[1] = sum[1].clone() + x[1].clone() * y[1].clone();
|
||||
sum[2] = sum[2].clone() + x[2].clone() * y[2].clone();
|
||||
sum[3] = sum[3].clone() + x[3].clone() * y[3].clone();
|
||||
}
|
||||
|
||||
let mut sum = sum[0].clone() + sum[1].clone() + sum[2].clone() + sum[3].clone();
|
||||
|
||||
@ -56,7 +56,7 @@ fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousIn>
|
||||
fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousIn>
|
||||
{
|
||||
let mut acc = vec![];
|
||||
use oxydsp_flowgraph::block::BlockInput;
|
||||
@ -67,7 +67,7 @@ fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
acc
|
||||
}
|
||||
|
||||
fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousIn>
|
||||
fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousIn>
|
||||
{
|
||||
let mut acc = vec![];
|
||||
use oxydsp_flowgraph::block::BlockInput;
|
||||
@ -85,7 +85,7 @@ fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
{
|
||||
let fields = fields.as_named().unwrap().named.clone();
|
||||
zyn::zyn!(
|
||||
fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousOut>
|
||||
fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousOut>
|
||||
{
|
||||
let mut acc = vec![];
|
||||
use oxydsp_flowgraph::block::BlockOutput;
|
||||
@ -96,7 +96,7 @@ fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream
|
||||
acc
|
||||
}
|
||||
|
||||
fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousOut>
|
||||
fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousOut>
|
||||
{
|
||||
let mut acc = vec![];
|
||||
use oxydsp_flowgraph::block::BlockOutput;
|
||||
@ -141,9 +141,10 @@ fn block_io_get_meta(ident: zyn::syn::Ident, fields: zyn::syn::Fields) -> zyn::T
|
||||
fn get_output_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
let mut output = Vec::new();
|
||||
use oxydsp_flowgraph::block::BlockOutput;
|
||||
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
|
||||
{
|
||||
output.push(self.{{ field.1.ident.clone() }}.get_type_name());
|
||||
output.extend(self.{{ field.1.ident.clone() }}.get_type_names());
|
||||
}
|
||||
return output;
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use crate::io::AnonymousIn;
|
||||
use crate::io::AnonymousOut;
|
||||
use crate::io::edge::AnonymousIn;
|
||||
use crate::io::edge::AnonymousOut;
|
||||
use crate::io::In;
|
||||
use crate::io::Out;
|
||||
use crate::io::edge::BlockIOIndex;
|
||||
@ -23,7 +23,7 @@ pub trait BlockInput
|
||||
fn get_inputs(&self) -> Vec<&dyn AnonymousIn>;
|
||||
|
||||
// Meta information
|
||||
fn get_types_names(&self) -> Vec<&'static str>;
|
||||
fn get_type_names(&self) -> Vec<&'static str>;
|
||||
}
|
||||
|
||||
pub trait BlockOutput
|
||||
@ -32,7 +32,7 @@ pub trait BlockOutput
|
||||
fn get_outputs(&self) -> Vec<&dyn AnonymousOut>;
|
||||
|
||||
// Meta information
|
||||
fn get_types_names(&self) -> Vec<&'static str>;
|
||||
fn get_type_names(&self) -> Vec<&'static str>;
|
||||
}
|
||||
|
||||
pub trait BlockIO
|
||||
@ -114,7 +114,7 @@ impl<T: 'static> BlockInput for In<T>
|
||||
vec![self]
|
||||
}
|
||||
|
||||
fn get_types_names(&self) -> Vec<&'static str>
|
||||
fn get_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
vec![std::any::type_name::<T>()]
|
||||
}
|
||||
@ -146,11 +146,11 @@ impl<I: BlockInput> BlockInput for Option<I>
|
||||
}
|
||||
}
|
||||
|
||||
fn get_types_names(&self) -> Vec<&'static str>
|
||||
fn get_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
if let Some(input) = self
|
||||
{
|
||||
input.get_types_names()
|
||||
input.get_type_names()
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -181,7 +181,7 @@ impl<I: BlockInput, const N: usize> BlockInput for [I; N]
|
||||
output
|
||||
}
|
||||
|
||||
fn get_types_names(&self) -> Vec<&'static str>
|
||||
fn get_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
vec![std::any::type_name::<I>(); N]
|
||||
}
|
||||
@ -199,7 +199,7 @@ impl<T: 'static> BlockOutput for Out<T>
|
||||
vec![self]
|
||||
}
|
||||
|
||||
fn get_types_names(&self) -> Vec<&'static str>
|
||||
fn get_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
vec![std::any::type_name::<T>()]
|
||||
}
|
||||
@ -231,11 +231,11 @@ impl<I: BlockOutput> BlockOutput for Option<I>
|
||||
}
|
||||
}
|
||||
|
||||
fn get_types_names(&self) -> Vec<&'static str>
|
||||
fn get_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
if let Some(input) = self
|
||||
{
|
||||
input.get_types_names()
|
||||
input.get_type_names()
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -266,7 +266,7 @@ impl<I: BlockOutput, const N: usize> BlockOutput for [I; N]
|
||||
result
|
||||
}
|
||||
|
||||
fn get_types_names(&self) -> Vec<&'static str>
|
||||
fn get_type_names(&self) -> Vec<&'static str>
|
||||
{
|
||||
vec![std::any::type_name::<I>(); N]
|
||||
}
|
||||
|
||||
@ -5,7 +5,6 @@ use std::thread::JoinHandle;
|
||||
use crossbeam_deque::Steal;
|
||||
use crossbeam_deque::Worker;
|
||||
|
||||
use crate::block;
|
||||
use crate::block::GraphableBlock;
|
||||
use crate::io::edge::BlockIOIndex;
|
||||
|
||||
|
||||
@ -1,21 +1,18 @@
|
||||
use std::any::Any;
|
||||
use std::mem::ManuallyDrop;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
|
||||
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
|
||||
|
||||
use crate::stream::StreamConsumer;
|
||||
use crate::stream::StreamProducer;
|
||||
use crate::stream::StreamReader;
|
||||
use crate::stream::StreamWriter;
|
||||
use crate::stream::{self};
|
||||
use crate::tag::TagSlot;
|
||||
use crate::tag::Tagged;
|
||||
|
||||
pub mod edge;
|
||||
|
||||
use crate::io::edge::BlockIOIndex;
|
||||
use crate::io::edge::Edge;
|
||||
|
||||
/// Represents a input port for a block
|
||||
@ -38,107 +35,281 @@ pub struct Out<T>
|
||||
edge: Arc<Mutex<Edge>>,
|
||||
}
|
||||
|
||||
/// Trait to manipulate a block's input in a type agnostic/erased way
|
||||
pub trait AnonymousIn
|
||||
// Input Output interfaces
|
||||
|
||||
/// Output interface to write elements in a "push" fashion
|
||||
pub struct OutPush<'a, T>
|
||||
{
|
||||
/// Inform the input about the index of the blocks it's in, as well as its port index
|
||||
fn set_index(&self, index: BlockIOIndex);
|
||||
data_writer: ManuallyDrop<StreamWriter<'a, T>>,
|
||||
tag_writer: ManuallyDrop<StreamWriter<'a, TagSlot>>,
|
||||
|
||||
/// Returns None or the block index of the block, and the block port of the corresponding
|
||||
/// Out object
|
||||
fn get_producer_block(&self) -> Option<BlockIOIndex>;
|
||||
|
||||
/// Sets the internal stream object
|
||||
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer);
|
||||
total_length: usize,
|
||||
written_data: usize,
|
||||
written_tags: usize,
|
||||
start_index: usize,
|
||||
}
|
||||
|
||||
/// Trait to manipulate a block's output in a type agnostic/erased way
|
||||
pub trait AnonymousOut
|
||||
impl<'a, T: 'static> OutPush<'a, T>
|
||||
{
|
||||
/// Inform the output about the index of the blocks it's in, as well as its port index
|
||||
fn set_index(&self, index: BlockIOIndex);
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
let data_slices = self.data_writer.slices();
|
||||
// This gives better performance !
|
||||
// Probably because it prevents claiming the whole buffer at once
|
||||
// But this is hacky. It should probably be managed at a lower level
|
||||
// ((data_slices.0.len() + data_slices.1.len()) / 2) - self.written_data
|
||||
|
||||
/// Sets the internal stream object
|
||||
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer);
|
||||
data_slices.0.len() + data_slices.1.len() - self.written_data
|
||||
}
|
||||
|
||||
/// Returns None or the block index of the block, and the block port of the corresponding
|
||||
/// In object
|
||||
fn get_consumer_block(&self) -> Option<BlockIOIndex>;
|
||||
pub fn is_empty(&self) -> bool
|
||||
{
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
/// Creates the stream with the correct corresponding type, in a type erased way.
|
||||
pub fn push(&mut self, data: Tagged<T>) -> Result<(), Tagged<T>>
|
||||
{
|
||||
// println!("\n\n\n");
|
||||
if self.written_data >= self.total_length
|
||||
{
|
||||
return Err(data);
|
||||
}
|
||||
|
||||
let data_slices = self.data_writer.slices_mut();
|
||||
let tag_slices = self.tag_writer.slices_mut();
|
||||
|
||||
// Write a data
|
||||
let data_ref = {
|
||||
if self.written_data < data_slices.0.len()
|
||||
{
|
||||
&mut data_slices.0[self.written_data]
|
||||
}
|
||||
else
|
||||
{
|
||||
&mut data_slices.1[self.written_data - data_slices.0.len()]
|
||||
}
|
||||
};
|
||||
// Index of the taken element within the stream.
|
||||
*data_ref = MaybeUninit::new(data.0);
|
||||
let element_index = self.start_index + self.written_data;
|
||||
self.written_data += 1;
|
||||
|
||||
// Check for corresponding tag
|
||||
let tag_ref = {
|
||||
if self.written_tags < tag_slices.0.len()
|
||||
{
|
||||
&mut tag_slices.0[self.written_tags]
|
||||
}
|
||||
else
|
||||
{
|
||||
&mut tag_slices.1[self.written_tags - tag_slices.0.len()]
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(tag) = data.1
|
||||
{
|
||||
*tag_ref = MaybeUninit::new(TagSlot {
|
||||
position: element_index,
|
||||
tag,
|
||||
});
|
||||
self.written_tags += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Drop for OutPush<'a, T>
|
||||
{
|
||||
fn drop(&mut self)
|
||||
{
|
||||
let mut data_writer =
|
||||
unsafe { ManuallyDrop::<StreamWriter<'a, T>>::take(&mut self.data_writer) };
|
||||
let mut tag_writer =
|
||||
unsafe { ManuallyDrop::<StreamWriter<'a, TagSlot>>::take(&mut self.tag_writer) };
|
||||
|
||||
|
||||
tag_writer.produce(self.written_tags);
|
||||
data_writer.produce(self.written_data);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InIter<'a, T>
|
||||
{
|
||||
data_reader: ManuallyDrop<StreamReader<'a, T>>,
|
||||
tag_reader: ManuallyDrop<StreamReader<'a, TagSlot>>,
|
||||
|
||||
total_length: usize,
|
||||
total_tag_length: usize,
|
||||
read_data: usize,
|
||||
read_tags: usize,
|
||||
start_index: usize,
|
||||
}
|
||||
|
||||
impl<'a, T> Iterator for InIter<'a, T>
|
||||
{
|
||||
type Item = Tagged<T>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item>
|
||||
{
|
||||
if self.read_data >= self.total_length
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let data_slices = self.data_reader.slices_mut();
|
||||
let tag_slices = self.tag_reader.slices_mut();
|
||||
|
||||
// Take a data
|
||||
// SAFETY:
|
||||
// All takable should contain a valid element: guarteed by the queue
|
||||
// We strictly monotonicly take all elements, in order, in the slices.
|
||||
// No two same indices should be taken.
|
||||
// We cound the number of taken elemnts and consume the correct amount from the queue
|
||||
let data = {
|
||||
if self.read_data < data_slices.0.len()
|
||||
{
|
||||
unsafe { data_slices.0[self.read_data].take() }
|
||||
}
|
||||
else
|
||||
{
|
||||
unsafe { data_slices.1[self.read_data - data_slices.0.len()].take() }
|
||||
}
|
||||
};
|
||||
// Index of the taken element within the stream.
|
||||
let element_index = self.start_index + self.read_data;
|
||||
self.read_data += 1;
|
||||
|
||||
// Check for corresponding tag
|
||||
let mut tag = None;
|
||||
if self.read_tags < self.total_tag_length
|
||||
{
|
||||
let tag_ref = {
|
||||
if self.read_tags < tag_slices.0.len()
|
||||
{
|
||||
&mut tag_slices.0[self.read_tags]
|
||||
}
|
||||
else
|
||||
{
|
||||
&mut tag_slices.1[self.read_tags - tag_slices.0.len()]
|
||||
}
|
||||
};
|
||||
|
||||
// SAFETY:
|
||||
// Same as before : strictly monotic access in the tag slices
|
||||
if unsafe { tag_ref.peek().position == element_index }
|
||||
{
|
||||
// The next tag in line is tagging the just-poped element.
|
||||
// We get it
|
||||
tag = Some(unsafe { tag_ref.take().tag });
|
||||
self.read_tags += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Some(Tagged::new(data, tag))
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>)
|
||||
{
|
||||
let len =
|
||||
self.data_reader.slices().0.len() + self.data_reader.slices().1.len() - self.read_data;
|
||||
(len, Some(len))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Drop for InIter<'a, T>
|
||||
{
|
||||
fn drop(&mut self)
|
||||
{
|
||||
let mut data_reader =
|
||||
unsafe { ManuallyDrop::<StreamReader<'a, T>>::take(&mut self.data_reader) };
|
||||
let mut tag_reader =
|
||||
unsafe { ManuallyDrop::<StreamReader<'a, TagSlot>>::take(&mut self.tag_reader) };
|
||||
tag_reader.consume(self.read_tags);
|
||||
data_reader.consume(self.read_data);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> ExactSizeIterator for InIter<'a, T> {}
|
||||
|
||||
impl<T: 'static> In<T>
|
||||
{
|
||||
pub fn iter<'a>(&'a mut self) -> InIter<'a, T>
|
||||
{
|
||||
let first_index = self.stream.as_ref().unwrap().first_index();
|
||||
let data_reader = self.stream.as_mut().unwrap().read_takable();
|
||||
let total_length = data_reader.slices().0.len() + data_reader.slices().1.len();
|
||||
let tag_reader = self.tag_stream.as_mut().unwrap().read_takable();
|
||||
let total_tag_length = tag_reader.slices().0.len() + tag_reader.slices().1.len();
|
||||
|
||||
InIter {
|
||||
data_reader: ManuallyDrop::new(data_reader),
|
||||
tag_reader: ManuallyDrop::new(tag_reader),
|
||||
read_data: 0,
|
||||
read_tags: 0,
|
||||
total_length,
|
||||
total_tag_length,
|
||||
start_index: first_index,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static> Out<T>
|
||||
{
|
||||
pub fn write_push<'a>(&'a mut self) -> OutPush<'a, T>
|
||||
{
|
||||
let first_index = self.stream.as_ref().unwrap().first_index();
|
||||
let data_writer = self.stream.as_mut().unwrap().write();
|
||||
let total_length = data_writer.slices().0.len() + data_writer.slices().1.len();
|
||||
|
||||
let tag_writer = self.tag_stream.as_mut().unwrap().write();
|
||||
|
||||
OutPush {
|
||||
data_writer: ManuallyDrop::new(data_writer),
|
||||
tag_writer: ManuallyDrop::new(tag_writer),
|
||||
total_length,
|
||||
written_data: 0,
|
||||
written_tags: 0,
|
||||
start_index: first_index,
|
||||
}
|
||||
}
|
||||
|
||||
/// Pushes an iterator to the output, sending the maximum amount of elements
|
||||
/// to the output.
|
||||
///
|
||||
/// This delegation of stream creation is necessary to allow the graph to manipulate
|
||||
/// it, as it cannot know about the generic type of the stream.
|
||||
fn create_anonymous_stream(
|
||||
&self,
|
||||
capacity: usize,
|
||||
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
|
||||
}
|
||||
|
||||
impl<T: 'static> AnonymousIn for In<T>
|
||||
{
|
||||
fn set_index(&self, index: BlockIOIndex)
|
||||
/// It will not consume the iterator more than what can be sent.
|
||||
///
|
||||
/// ```
|
||||
/// let writer = output.write();
|
||||
///
|
||||
/// // Send only 42s to the output
|
||||
/// writer.push_iter(std::iter::repeat(42));
|
||||
/// ```
|
||||
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
|
||||
{
|
||||
self.edge.lock().unwrap().to = Some(index);
|
||||
let mut pusher = self.write_push();
|
||||
let mut len = pusher.len();
|
||||
|
||||
while len > 0
|
||||
{
|
||||
len -= 1;
|
||||
match iter.next()
|
||||
{
|
||||
Some(element) => {
|
||||
let _ = pusher.push(element);
|
||||
},
|
||||
None => return false,
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
fn get_producer_block(&self) -> Option<BlockIOIndex>
|
||||
/// Meta information
|
||||
/// Returns a string of the type of the output
|
||||
pub fn get_type_name(&self) -> &'static str
|
||||
{
|
||||
self.edge.lock().unwrap().from
|
||||
std::any::type_name::<T>()
|
||||
}
|
||||
|
||||
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
|
||||
{
|
||||
let (stream, tag_stream) = consumer.downcast::<T>();
|
||||
self.stream = Some(stream);
|
||||
self.tag_stream = Some(tag_stream);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static> AnonymousOut for Out<T>
|
||||
{
|
||||
fn set_index(&self, index: BlockIOIndex)
|
||||
{
|
||||
self.edge.lock().unwrap().from = Some(index);
|
||||
}
|
||||
|
||||
fn get_consumer_block(&self) -> Option<BlockIOIndex>
|
||||
{
|
||||
self.edge.lock().unwrap().to
|
||||
}
|
||||
|
||||
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
|
||||
{
|
||||
let (stream, tag_stream) = producer.downcast::<T>();
|
||||
self.stream = Some(stream);
|
||||
self.tag_stream = Some(tag_stream);
|
||||
}
|
||||
|
||||
// Delegate stream creation to Out object
|
||||
// which knows the stream type
|
||||
fn create_anonymous_stream(
|
||||
&self,
|
||||
capacity: usize,
|
||||
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
|
||||
{
|
||||
let (tx, rx) = stream::bounded_queue::<T>(capacity);
|
||||
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
|
||||
((tx, tx_tag).into(), (rx, rx_tag).into())
|
||||
}
|
||||
}
|
||||
|
||||
/// A Reader to get data from an input
|
||||
pub struct InReader<'a, T>
|
||||
{
|
||||
data_reader: StreamReader<'a, T>,
|
||||
tag_reader: StreamReader<'a, TagSlot>,
|
||||
}
|
||||
|
||||
/// A writer to send data to an output
|
||||
pub struct OutWriter<'a, T>
|
||||
{
|
||||
data_writer: StreamWriter<'a, T>,
|
||||
tag_writer: StreamWriter<'a, TagSlot>,
|
||||
}
|
||||
|
||||
/// Creates a stream that can then be used to link blocks
|
||||
@ -168,306 +339,97 @@ pub fn stream<T>() -> (Out<T>, In<T>)
|
||||
)
|
||||
}
|
||||
|
||||
impl<T: 'static> In<T>
|
||||
pub fn streams<T, const N: usize>() -> ([Out<T>; N], [In<T>; N])
|
||||
{
|
||||
/// Gets a reader view from an input.
|
||||
///
|
||||
/// ```
|
||||
/// let reader = input.read();
|
||||
/// let data = reader.pop();
|
||||
/// ```
|
||||
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
|
||||
{
|
||||
let data_reader = self.stream.as_mut().unwrap().read();
|
||||
let tag_reader = self.tag_stream.as_mut().unwrap().read();
|
||||
InReader {
|
||||
data_reader,
|
||||
tag_reader,
|
||||
}
|
||||
}
|
||||
// Ugly simultanous initialization
|
||||
let mut ins: [_; N] = std::array::from_fn(|_| None);
|
||||
let mut outs: [_; N] = std::array::from_fn(|_| None);
|
||||
|
||||
ins.iter_mut()
|
||||
.zip(outs.iter_mut())
|
||||
.for_each(|(input, output)| {
|
||||
let (newout, newin) = stream();
|
||||
*input = Some(newin);
|
||||
*output = Some(newout);
|
||||
});
|
||||
|
||||
let ins_some: [_; N] = std::array::from_fn(|i| ins[i].take().unwrap());
|
||||
let outs_some: [_; N] = std::array::from_fn(|i| outs[i].take().unwrap());
|
||||
|
||||
(outs_some, ins_some)
|
||||
}
|
||||
|
||||
impl<T: 'static> Out<T>
|
||||
{
|
||||
/// Gets a reader view from an output.
|
||||
///
|
||||
/// ```
|
||||
/// let writer = output.write();
|
||||
/// writer.push((data, tag).into());
|
||||
/// ```
|
||||
pub fn write<'a>(&'a mut self) -> OutWriter<'a, T>
|
||||
{
|
||||
OutWriter {
|
||||
data_writer: self.stream.as_mut().unwrap().write(),
|
||||
tag_writer: self.tag_stream.as_mut().unwrap().write(),
|
||||
}
|
||||
}
|
||||
// --------------------
|
||||
// Iterator facilites
|
||||
// --------------------
|
||||
|
||||
/// Pushes an iterator to the output, sending the maximum amount of elements
|
||||
/// to the output.
|
||||
///
|
||||
/// It will not consume the iterator more than what can be sent.
|
||||
///
|
||||
/// ```
|
||||
/// let writer = output.write();
|
||||
///
|
||||
/// // Send only 42s to the output
|
||||
/// writer.push_iter(std::iter::repeat(42));
|
||||
/// ```
|
||||
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
|
||||
{
|
||||
let writer = self.write();
|
||||
let len = writer.len();
|
||||
// An iterator type to push data to output(s)
|
||||
// pub struct PopIter<T>
|
||||
// {
|
||||
// len: usize,
|
||||
// popped: usize,
|
||||
// reader: T,
|
||||
// }
|
||||
|
||||
for _ in 0..len
|
||||
{
|
||||
if let Some(elt) = iter.next()
|
||||
{
|
||||
let _ = writer.push(elt);
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
// Type on which data can be popped from
|
||||
// pub trait PopIterable<'a>
|
||||
// {
|
||||
// type Output;
|
||||
//
|
||||
// /// Returns an iterator on the input elements :
|
||||
// ///
|
||||
// /// ```
|
||||
// /// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !"));
|
||||
// /// ```
|
||||
// fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
|
||||
// }
|
||||
|
||||
/// Meta information
|
||||
/// Returns a string of the type of the output
|
||||
pub fn get_type_name(&self) -> &'static str
|
||||
{
|
||||
std::any::type_name::<T>()
|
||||
}
|
||||
}
|
||||
// impl<'a, T: 'static> PopIterable<'a> for In<T>
|
||||
// {
|
||||
// type Output = InReader<'a, T>;
|
||||
// fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
|
||||
// {
|
||||
// let reader = self.read();
|
||||
// PopIter {
|
||||
// popped: 0,
|
||||
// len: reader.len(),
|
||||
// reader,
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// generate_pop_iterable_tuple_impl! {1}
|
||||
// generate_pop_iterable_tuple_impl! {2}
|
||||
// generate_pop_iterable_tuple_impl! {3}
|
||||
// generate_pop_iterable_tuple_impl! {4}
|
||||
// generate_pop_iterable_tuple_impl! {5}
|
||||
// generate_pop_iterable_tuple_impl! {6}
|
||||
// generate_pop_iterable_tuple_impl! {7}
|
||||
// generate_pop_iterable_tuple_impl! {8}
|
||||
// generate_pop_iterable_tuple_impl! {9}
|
||||
// generate_pop_iterable_tuple_impl! {10}
|
||||
// generate_pop_iterable_tuple_impl! {11}
|
||||
// generate_pop_iterable_tuple_impl! {12}
|
||||
//
|
||||
// impl<'a, T> Iterator for PopIter<InReader<'a, T>>
|
||||
// {
|
||||
// type Item = Tagged<T>;
|
||||
//
|
||||
// fn next(&mut self) -> Option<Self::Item>
|
||||
// {
|
||||
// self.reader.pop()
|
||||
// }
|
||||
// }
|
||||
|
||||
impl<T> InReader<'_, T>
|
||||
{
|
||||
/// Gets the amount of elements that are available
|
||||
/// on the input.
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.data_reader.len()
|
||||
}
|
||||
|
||||
/// Returns true iif no elements are available on the input.
|
||||
pub fn is_empty(&self) -> bool
|
||||
{
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
/// Pops an element from the input.
|
||||
/// It is guaranteed to return `Some(data)` if
|
||||
/// if pop was called strictly less times than len
|
||||
pub fn pop(&self) -> Option<Tagged<T>>
|
||||
{
|
||||
let data = self.data_reader.pop_with_index();
|
||||
if let Some((data, index)) = data
|
||||
{
|
||||
let mut tag = None;
|
||||
if self
|
||||
.tag_reader
|
||||
.peek(|t| t.position)
|
||||
.is_some_and(|x| x == index)
|
||||
{
|
||||
tag = self.tag_reader.pop();
|
||||
}
|
||||
Some((data, tag.map(|t| t.tag)).into())
|
||||
}
|
||||
else
|
||||
{
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Pops an element from the input, discarding the tag.
|
||||
/// It is guaranteed to return `Some(data)` if
|
||||
/// if pop was called strictly less times than len
|
||||
pub fn pop_untag(&self) -> Option<T>
|
||||
{
|
||||
self.pop().map(|data| data.into_inner())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> OutWriter<'_, T>
|
||||
{
|
||||
/// Gets how much room is available on the output
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.data_writer.len().min(self.tag_writer.len())
|
||||
}
|
||||
|
||||
/// Returns true iif no element can be sent
|
||||
pub fn is_empty(&self) -> bool
|
||||
{
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
/// Pushes some tagged data on the input.
|
||||
///
|
||||
/// The operation succeeds (`Ok(())`) if there is enough room
|
||||
/// Or fails returning the given data to the caller.
|
||||
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
|
||||
{
|
||||
let (data, tag) = data.into();
|
||||
let position = self.data_writer.next_index();
|
||||
let tag = tag.map(|t| TagSlot { position, tag: t });
|
||||
|
||||
match self.data_writer.push(data)
|
||||
{
|
||||
Ok(_) if tag.is_some() =>
|
||||
{
|
||||
let _ = self.tag_writer.push(tag.unwrap());
|
||||
Ok(())
|
||||
}
|
||||
Ok(_) => Ok(()),
|
||||
Err(data) => Err((data, tag.map(|t| t.tag)).into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Pushes some data on the input (not tagged).
|
||||
///
|
||||
/// The operation succeeds (`Ok(())`) if there is enough room
|
||||
/// Or fails returning the given data to the caller.
|
||||
pub fn push_no_tag(&self, data: T) -> Result<(), T>
|
||||
{
|
||||
self.data_writer.push(data)
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator type to push data to output(s)
|
||||
pub struct PopIter<T>
|
||||
{
|
||||
len: usize,
|
||||
popped: usize,
|
||||
reader: T,
|
||||
}
|
||||
|
||||
/// Type on which data can be popped from
|
||||
pub trait PopIterable<'a>
|
||||
{
|
||||
type Output;
|
||||
|
||||
/// Returns an iterator on the input elements :
|
||||
///
|
||||
/// ```
|
||||
/// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !"));
|
||||
/// ```
|
||||
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
|
||||
}
|
||||
|
||||
impl<'a, T: 'static> PopIterable<'a> for In<T>
|
||||
{
|
||||
type Output = InReader<'a, T>;
|
||||
fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
|
||||
{
|
||||
let reader = self.read();
|
||||
PopIter {
|
||||
popped: 0,
|
||||
len: reader.len(),
|
||||
reader,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
generate_pop_iterable_tuple_impl! {1}
|
||||
generate_pop_iterable_tuple_impl! {2}
|
||||
generate_pop_iterable_tuple_impl! {3}
|
||||
generate_pop_iterable_tuple_impl! {4}
|
||||
generate_pop_iterable_tuple_impl! {5}
|
||||
generate_pop_iterable_tuple_impl! {6}
|
||||
generate_pop_iterable_tuple_impl! {7}
|
||||
generate_pop_iterable_tuple_impl! {8}
|
||||
generate_pop_iterable_tuple_impl! {9}
|
||||
generate_pop_iterable_tuple_impl! {10}
|
||||
generate_pop_iterable_tuple_impl! {11}
|
||||
generate_pop_iterable_tuple_impl! {12}
|
||||
|
||||
impl<'a, T> Iterator for PopIter<InReader<'a, T>>
|
||||
{
|
||||
type Item = Tagged<T>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item>
|
||||
{
|
||||
self.reader.pop()
|
||||
}
|
||||
}
|
||||
|
||||
impl_iterator_for_pop_iter_tuple! {1}
|
||||
impl_iterator_for_pop_iter_tuple! {2}
|
||||
impl_iterator_for_pop_iter_tuple! {3}
|
||||
impl_iterator_for_pop_iter_tuple! {4}
|
||||
impl_iterator_for_pop_iter_tuple! {5}
|
||||
impl_iterator_for_pop_iter_tuple! {6}
|
||||
impl_iterator_for_pop_iter_tuple! {7}
|
||||
impl_iterator_for_pop_iter_tuple! {8}
|
||||
impl_iterator_for_pop_iter_tuple! {9}
|
||||
impl_iterator_for_pop_iter_tuple! {10}
|
||||
impl_iterator_for_pop_iter_tuple! {11}
|
||||
impl_iterator_for_pop_iter_tuple! {12}
|
||||
|
||||
/// StreamProducer object for data and tags stored in a type
|
||||
/// agnostic/erased way.
|
||||
///
|
||||
/// This is needed for the graph system to manipulate and pass arround these objects
|
||||
/// as they can't/don't know about the generic types of the stream objects
|
||||
pub struct AnonymousStreamProducer
|
||||
{
|
||||
inner: Box<dyn Any>,
|
||||
inner_tag: StreamProducer<TagSlot>,
|
||||
}
|
||||
|
||||
/// StreamConsumer object for data and tags stored in a type
|
||||
/// agnostic/erased way.
|
||||
///
|
||||
/// This is needed for the graph system to manipulate and pass arround these objects
|
||||
/// as they can't/don't know about the generic types of the stream objects
|
||||
pub struct AnonymousStreamConsumer
|
||||
{
|
||||
inner: Box<dyn Any>,
|
||||
inner_tag: StreamConsumer<TagSlot>,
|
||||
}
|
||||
|
||||
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<TagSlot>)> for AnonymousStreamProducer
|
||||
{
|
||||
fn from(value: (StreamProducer<T>, StreamProducer<TagSlot>)) -> Self
|
||||
{
|
||||
AnonymousStreamProducer {
|
||||
inner: Box::new(value.0),
|
||||
inner_tag: value.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<TagSlot>)> for AnonymousStreamConsumer
|
||||
{
|
||||
fn from(value: (StreamConsumer<T>, StreamConsumer<TagSlot>)) -> Self
|
||||
{
|
||||
AnonymousStreamConsumer {
|
||||
inner: Box::new(value.0),
|
||||
inner_tag: value.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AnonymousStreamProducer
|
||||
{
|
||||
pub(crate) fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<TagSlot>)
|
||||
{
|
||||
(
|
||||
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
|
||||
self.inner_tag,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl AnonymousStreamConsumer
|
||||
{
|
||||
pub(crate) fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<TagSlot>)
|
||||
{
|
||||
(
|
||||
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
|
||||
self.inner_tag,
|
||||
)
|
||||
}
|
||||
}
|
||||
// impl_iterator_for_pop_iter_tuple! {1}
|
||||
// impl_iterator_for_pop_iter_tuple! {2}
|
||||
// impl_iterator_for_pop_iter_tuple! {3}
|
||||
// impl_iterator_for_pop_iter_tuple! {4}
|
||||
// impl_iterator_for_pop_iter_tuple! {5}
|
||||
// impl_iterator_for_pop_iter_tuple! {6}
|
||||
// impl_iterator_for_pop_iter_tuple! {7}
|
||||
// impl_iterator_for_pop_iter_tuple! {8}
|
||||
// impl_iterator_for_pop_iter_tuple! {9}
|
||||
// impl_iterator_for_pop_iter_tuple! {10}
|
||||
// impl_iterator_for_pop_iter_tuple! {11}
|
||||
// impl_iterator_for_pop_iter_tuple! {12}
|
||||
|
||||
@ -1,3 +1,7 @@
|
||||
use std::any::Any;
|
||||
|
||||
use crate::{io::{In, Out}, stream::{self, StreamConsumer, StreamProducer}, tag::TagSlot};
|
||||
|
||||
/// Shared object between a block's input and output objects
|
||||
/// so they can "communicate" and know about each other
|
||||
#[derive(Default)]
|
||||
@ -21,3 +25,158 @@ pub struct BlockIOIndex
|
||||
pub block_index: usize,
|
||||
pub port_index: usize,
|
||||
}
|
||||
|
||||
/// Trait to manipulate a block's input in a type agnostic/erased way
|
||||
pub trait AnonymousIn
|
||||
{
|
||||
/// Inform the input about the index of the blocks it's in, as well as its port index
|
||||
fn set_index(&self, index: BlockIOIndex);
|
||||
|
||||
/// Returns None or the block index of the block, and the block port of the corresponding
|
||||
/// Out object
|
||||
fn get_producer_block(&self) -> Option<BlockIOIndex>;
|
||||
|
||||
/// Sets the internal stream object
|
||||
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer);
|
||||
}
|
||||
|
||||
/// Trait to manipulate a block's output in a type agnostic/erased way
|
||||
pub trait AnonymousOut
|
||||
{
|
||||
/// Inform the output about the index of the blocks it's in, as well as its port index
|
||||
fn set_index(&self, index: BlockIOIndex);
|
||||
|
||||
/// Sets the internal stream object
|
||||
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer);
|
||||
|
||||
/// Returns None or the block index of the block, and the block port of the corresponding
|
||||
/// In object
|
||||
fn get_consumer_block(&self) -> Option<BlockIOIndex>;
|
||||
|
||||
/// Creates the stream with the correct corresponding type, in a type erased way.
|
||||
///
|
||||
/// This delegation of stream creation is necessary to allow the graph to manipulate
|
||||
/// it, as it cannot know about the generic type of the stream.
|
||||
fn create_anonymous_stream(
|
||||
&self,
|
||||
capacity: usize,
|
||||
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
|
||||
}
|
||||
|
||||
impl<T: 'static> AnonymousIn for In<T>
|
||||
{
|
||||
fn set_index(&self, index: BlockIOIndex)
|
||||
{
|
||||
self.edge.lock().unwrap().to = Some(index);
|
||||
}
|
||||
|
||||
fn get_producer_block(&self) -> Option<BlockIOIndex>
|
||||
{
|
||||
self.edge.lock().unwrap().from
|
||||
}
|
||||
|
||||
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
|
||||
{
|
||||
let (stream, tag_stream) = consumer.downcast::<T>();
|
||||
self.stream = Some(stream);
|
||||
self.tag_stream = Some(tag_stream);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static> AnonymousOut for Out<T>
|
||||
{
|
||||
fn set_index(&self, index: BlockIOIndex)
|
||||
{
|
||||
self.edge.lock().unwrap().from = Some(index);
|
||||
}
|
||||
|
||||
fn get_consumer_block(&self) -> Option<BlockIOIndex>
|
||||
{
|
||||
self.edge.lock().unwrap().to
|
||||
}
|
||||
|
||||
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
|
||||
{
|
||||
let (stream, tag_stream) = producer.downcast::<T>();
|
||||
self.stream = Some(stream);
|
||||
self.tag_stream = Some(tag_stream);
|
||||
}
|
||||
|
||||
// Delegate stream creation to Out object
|
||||
// which knows the stream type
|
||||
fn create_anonymous_stream(
|
||||
&self,
|
||||
capacity: usize,
|
||||
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
|
||||
{
|
||||
let (tx, rx) = stream::bounded_queue::<T>(capacity);
|
||||
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
|
||||
((tx, tx_tag).into(), (rx, rx_tag).into())
|
||||
}
|
||||
}
|
||||
|
||||
/// StreamProducer object for data and tags stored in a type
|
||||
/// agnostic/erased way.
|
||||
///
|
||||
/// This is needed for the graph system to manipulate and pass arround these objects
|
||||
/// as they can't/don't know about the generic types of the stream objects
|
||||
pub struct AnonymousStreamProducer
|
||||
{
|
||||
inner: Box<dyn Any>,
|
||||
inner_tag: StreamProducer<TagSlot>,
|
||||
}
|
||||
|
||||
/// StreamConsumer object for data and tags stored in a type
|
||||
/// agnostic/erased way.
|
||||
///
|
||||
/// This is needed for the graph system to manipulate and pass arround these objects
|
||||
/// as they can't/don't know about the generic types of the stream objects
|
||||
pub struct AnonymousStreamConsumer
|
||||
{
|
||||
inner: Box<dyn Any>,
|
||||
inner_tag: StreamConsumer<TagSlot>,
|
||||
}
|
||||
|
||||
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<TagSlot>)> for AnonymousStreamProducer
|
||||
{
|
||||
fn from(value: (StreamProducer<T>, StreamProducer<TagSlot>)) -> Self
|
||||
{
|
||||
AnonymousStreamProducer {
|
||||
inner: Box::new(value.0),
|
||||
inner_tag: value.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<TagSlot>)> for AnonymousStreamConsumer
|
||||
{
|
||||
fn from(value: (StreamConsumer<T>, StreamConsumer<TagSlot>)) -> Self
|
||||
{
|
||||
AnonymousStreamConsumer {
|
||||
inner: Box::new(value.0),
|
||||
inner_tag: value.1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AnonymousStreamProducer
|
||||
{
|
||||
pub(crate) fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<TagSlot>)
|
||||
{
|
||||
(
|
||||
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
|
||||
self.inner_tag,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl AnonymousStreamConsumer
|
||||
{
|
||||
pub(crate) fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<TagSlot>)
|
||||
{
|
||||
(
|
||||
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
|
||||
self.inner_tag,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
use std::cell::Cell;
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::ops::Deref;
|
||||
@ -53,36 +52,35 @@ unsafe impl<T: Send> Sync for StreamProducer<T> {}
|
||||
unsafe impl<T: Send> Send for StreamConsumer<T> {}
|
||||
unsafe impl<T: Send> Sync for StreamConsumer<T> {}
|
||||
|
||||
// Represents a write operation within a stream producer
|
||||
pub struct StreamWriter<'a, T>
|
||||
{
|
||||
producer: &'a mut StreamProducer<T>,
|
||||
first: &'a UnsafeCell<[MaybeUninit<T>]>,
|
||||
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
|
||||
first_len: usize,
|
||||
second_len: usize,
|
||||
written: Cell<usize>,
|
||||
#[repr(transparent)]
|
||||
pub struct Takable<T>(MaybeUninit<T>);
|
||||
|
||||
// Index of the first element to be pushed
|
||||
// within the "infinite buffer"
|
||||
// Used to number tags
|
||||
start_index: usize,
|
||||
impl<T> Takable<T>
|
||||
{
|
||||
pub fn new(element: T) -> Self
|
||||
{
|
||||
Takable(MaybeUninit::new(element))
|
||||
}
|
||||
|
||||
pub unsafe fn take(&mut self) -> T
|
||||
{
|
||||
unsafe { std::mem::replace(&mut self.0, MaybeUninit::uninit()).assume_init() }
|
||||
}
|
||||
|
||||
pub unsafe fn peek(&self) -> &T
|
||||
{
|
||||
unsafe { self.0.assume_init_ref() }
|
||||
}
|
||||
|
||||
pub unsafe fn peek_mut(&mut self) -> &mut T
|
||||
{
|
||||
unsafe { self.0.assume_init_mut() }
|
||||
}
|
||||
}
|
||||
|
||||
// Represents a read operation within a stream producer
|
||||
pub struct StreamReader<'a, T>
|
||||
unsafe fn takable_slice_from_maybe_uninitt<T>(slice: &mut [MaybeUninit<T>]) -> &mut [Takable<T>]
|
||||
{
|
||||
producer: &'a StreamConsumer<T>,
|
||||
first: &'a UnsafeCell<[MaybeUninit<T>]>,
|
||||
second: Option<&'a UnsafeCell<[MaybeUninit<T>]>>,
|
||||
first_len: usize,
|
||||
second_len: usize,
|
||||
read: Cell<usize>,
|
||||
|
||||
// Index of the first element to be read
|
||||
// within the "infinite buffer"
|
||||
// Used to number tags
|
||||
start_index: usize,
|
||||
unsafe { std::mem::transmute(slice) }
|
||||
}
|
||||
|
||||
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
|
||||
@ -121,8 +119,78 @@ pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T
|
||||
)
|
||||
}
|
||||
|
||||
pub struct StreamReader<'a, T>
|
||||
{
|
||||
slices: (&'a mut [Takable<T>], &'a mut [Takable<T>]),
|
||||
|
||||
// UNSAFE !
|
||||
inner: &'a mut StreamConsumer<T>,
|
||||
}
|
||||
|
||||
pub struct StreamWriter<'a, T>
|
||||
{
|
||||
slices: (&'a mut [MaybeUninit<T>], &'a mut [MaybeUninit<T>]),
|
||||
|
||||
// UNSAFE !
|
||||
inner: &'a mut StreamProducer<T>,
|
||||
}
|
||||
|
||||
impl<'a, T> StreamReader<'a, T>
|
||||
{
|
||||
pub fn slices(&self) -> (&[Takable<T>], &[Takable<T>])
|
||||
{
|
||||
(self.slices.0, self.slices.1)
|
||||
}
|
||||
|
||||
pub fn slices_mut(&mut self) -> (&mut [Takable<T>], &mut [Takable<T>])
|
||||
{
|
||||
(self.slices.0, self.slices.1)
|
||||
}
|
||||
|
||||
pub fn consume(&mut self, read: usize)
|
||||
{
|
||||
self.inner.consume(read);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> StreamWriter<'a, T>
|
||||
{
|
||||
pub fn slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>])
|
||||
{
|
||||
(self.slices.0, self.slices.1)
|
||||
}
|
||||
|
||||
pub fn slices_mut(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
|
||||
{
|
||||
(self.slices.0, self.slices.1)
|
||||
}
|
||||
|
||||
pub fn produce(&mut self, written: usize)
|
||||
{
|
||||
self.inner.produce(written);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> StreamProducer<T>
|
||||
{
|
||||
pub fn first_index(&self) -> usize
|
||||
{
|
||||
self.inner.head.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn produce(&mut self, written: usize)
|
||||
{
|
||||
// Advance head.
|
||||
let head = self.inner.head.load(Ordering::Relaxed);
|
||||
let tail = self.inner.tail.load(Ordering::Relaxed);
|
||||
|
||||
// Check bounds
|
||||
assert!(head + written - tail <= (self.inner.capacity_mask + 1));
|
||||
|
||||
// We want writes to the buffer to be visible when acquired in the pop side
|
||||
self.inner.head.store(head + written, Ordering::Release);
|
||||
}
|
||||
|
||||
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
|
||||
{
|
||||
// We need to claim the maximum amount of elements.
|
||||
@ -156,57 +224,26 @@ impl<T> StreamProducer<T>
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
|
||||
let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head);
|
||||
let (start_to_tail, _tail_to_head) =
|
||||
start_to_head.split_at_mut_unchecked(wrapped_tail);
|
||||
|
||||
// Slices are wrapped into unsafe cells to provide interior mutability
|
||||
// On the stream as it is much more convienient.
|
||||
//
|
||||
// SAFETY:
|
||||
//
|
||||
// This functions borrows the stream mutably. As such, only one instance
|
||||
// of StreamWriter can exist for a given stream. The StreamWriter
|
||||
// is thus the only on able to write or read the stream when it lives
|
||||
let first_len = head_to_end.len();
|
||||
let second_len = start_to_tail.len();
|
||||
let first = std::mem::transmute::<
|
||||
&mut [MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(head_to_end);
|
||||
let second = Some(std::mem::transmute::<
|
||||
&mut [MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(start_to_tail));
|
||||
// of these slices can exist for a given stream.
|
||||
StreamWriter {
|
||||
start_index: head,
|
||||
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
first_len,
|
||||
second_len,
|
||||
written: 0.into(),
|
||||
slices: (head_to_end, start_to_head),
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// We MUST have : tail < head
|
||||
|
||||
if wrapped_tail < wrapped_head
|
||||
{
|
||||
//
|
||||
// Or
|
||||
// ▯▯▯▯▯▯▯▯▯▯▯▯▯
|
||||
// |
|
||||
// tail & head
|
||||
// (empty)
|
||||
// Current configuration :
|
||||
// ▯▯▯▮▮▮▮▮▮▯▯▯▯
|
||||
// | |
|
||||
// tail head
|
||||
// ___ ____
|
||||
// slice1 slice2
|
||||
// slice2 slice1
|
||||
|
||||
// SAFETY:
|
||||
//
|
||||
@ -223,27 +260,9 @@ impl<T> StreamProducer<T>
|
||||
let (start_to_tail, _tail_to_head) =
|
||||
start_to_head.split_at_mut_unchecked(wrapped_tail);
|
||||
|
||||
let first_len = head_to_end.len();
|
||||
let second_len = start_to_tail.len();
|
||||
|
||||
let first = std::mem::transmute::<
|
||||
&mut [MaybeUninit<T>],
|
||||
&UnsafeCell<[std::mem::MaybeUninit<T>]>,
|
||||
>(head_to_end);
|
||||
let second = Some(std::mem::transmute::<
|
||||
&mut [MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(start_to_tail));
|
||||
|
||||
StreamWriter {
|
||||
start_index: head,
|
||||
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
first_len,
|
||||
second_len,
|
||||
written: 0.into(),
|
||||
slices: (head_to_end, start_to_tail),
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -261,8 +280,8 @@ impl<T> StreamProducer<T>
|
||||
// |
|
||||
// tail & head
|
||||
// (full)
|
||||
// ______.______
|
||||
// slice2 slice1
|
||||
// .
|
||||
// slice1
|
||||
|
||||
// SAFETY:
|
||||
//
|
||||
@ -272,19 +291,14 @@ impl<T> StreamProducer<T>
|
||||
// Head and tail are both indices of the slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
let len = wrapped_tail - wrapped_head;
|
||||
StreamWriter {
|
||||
start_index: head,
|
||||
let (start_to_tail, _tail_to_end) = k.split_at_mut_unchecked(wrapped_tail);
|
||||
let (_start_to_head, head_to_tail) =
|
||||
start_to_tail.split_at_mut_unchecked(wrapped_head);
|
||||
let (empty_slice, head_to_tail) = head_to_tail.split_at_mut_unchecked(0);
|
||||
|
||||
producer: self,
|
||||
first_len: len,
|
||||
second_len: 0,
|
||||
first: std::mem::transmute::<
|
||||
&[MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(&k[wrapped_head..wrapped_tail]),
|
||||
second: None,
|
||||
written: 0.into(),
|
||||
StreamWriter {
|
||||
slices: (head_to_tail, empty_slice),
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -294,7 +308,25 @@ impl<T> StreamProducer<T>
|
||||
|
||||
impl<T> StreamConsumer<T>
|
||||
{
|
||||
pub fn read<'a>(&'a mut self) -> StreamReader<'a, T>
|
||||
pub fn consume(&mut self, read: usize)
|
||||
{
|
||||
// Advance head.
|
||||
let head = self.inner.head.load(Ordering::Relaxed);
|
||||
let tail = self.inner.tail.load(Ordering::Relaxed);
|
||||
|
||||
// Check bounds
|
||||
assert!(tail + read <= head);
|
||||
|
||||
// We want writes to the buffer to be visible when acquired in the pop side
|
||||
self.inner.tail.store(tail + read, Ordering::Release);
|
||||
}
|
||||
|
||||
pub fn first_index(&self) -> usize
|
||||
{
|
||||
self.inner.tail.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn read_takable<'a>(&'a mut self) -> StreamReader<'a, T>
|
||||
{
|
||||
// We need to claim the maximum amount of elements.
|
||||
let head = self.inner.head.load(Ordering::Acquire);
|
||||
@ -311,24 +343,20 @@ impl<T> StreamConsumer<T>
|
||||
// Buffer is empty. Return empty slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
let len = wrapped_head - wrapped_tail;
|
||||
let empty = &mut k[0..0];
|
||||
let (empty_1, empty_2) = empty.split_at_mut_unchecked(0);
|
||||
StreamReader {
|
||||
start_index: tail,
|
||||
|
||||
producer: self,
|
||||
first_len: len,
|
||||
second_len: 0,
|
||||
first: std::mem::transmute::<&[MaybeUninit<T>], &UnsafeCell<[MaybeUninit<T>]>>(
|
||||
&k[wrapped_tail..wrapped_head],
|
||||
slices: (
|
||||
takable_slice_from_maybe_uninitt(empty_1),
|
||||
takable_slice_from_maybe_uninitt(empty_2),
|
||||
),
|
||||
second: None,
|
||||
read: 0.into(),
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Necessarly: wrapped_tail < wrapped_head
|
||||
// Necessarly: wrapped_tail <= wrapped_head
|
||||
// Two cases : The buffer overlaps the wrapping or not
|
||||
if wrapped_tail < wrapped_head
|
||||
{
|
||||
@ -346,20 +374,17 @@ impl<T> StreamConsumer<T>
|
||||
//
|
||||
// Head and tail are both indices of the slice
|
||||
unsafe {
|
||||
let k = &mut *self.inner.buffer.get();
|
||||
let len = wrapped_head - wrapped_tail;
|
||||
StreamReader {
|
||||
start_index: tail,
|
||||
let k = &mut (&mut *self.inner.buffer.get())[wrapped_tail..wrapped_head];
|
||||
let (tail_to_head, empty_slice) =
|
||||
k.split_at_mut_unchecked(wrapped_head - wrapped_tail);
|
||||
assert_eq!(empty_slice.len(), 0);
|
||||
|
||||
producer: self,
|
||||
first_len: len,
|
||||
second_len: 0,
|
||||
first: std::mem::transmute::<
|
||||
&[MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(&k[wrapped_tail..wrapped_head]),
|
||||
second: None,
|
||||
read: 0.into(),
|
||||
StreamReader {
|
||||
slices: (
|
||||
takable_slice_from_maybe_uninitt(tail_to_head),
|
||||
takable_slice_from_maybe_uninitt(empty_slice),
|
||||
),
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -395,339 +420,33 @@ impl<T> StreamConsumer<T>
|
||||
let (start_to_head, _head_to_tail) =
|
||||
start_to_tail.split_at_mut_unchecked(wrapped_head);
|
||||
|
||||
let first_len = tail_to_end.len();
|
||||
let second_len = start_to_head.len();
|
||||
|
||||
let first = std::mem::transmute::<
|
||||
&mut [MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(tail_to_end);
|
||||
let second = Some(std::mem::transmute::<
|
||||
&mut [MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(start_to_head));
|
||||
|
||||
StreamReader {
|
||||
start_index: tail,
|
||||
|
||||
producer: self,
|
||||
first,
|
||||
second,
|
||||
first_len,
|
||||
second_len,
|
||||
read: 0.into(),
|
||||
slices: (
|
||||
takable_slice_from_maybe_uninitt(tail_to_end),
|
||||
takable_slice_from_maybe_uninitt(start_to_head),
|
||||
),
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a reader of contiguous elements that
|
||||
/// satisfy the predicate
|
||||
pub fn read_while<F>(&mut self, predicate: F) -> StreamReader<'_, T>
|
||||
where
|
||||
F: Fn(&T) -> bool,
|
||||
{
|
||||
// Take a normal reader. This contains available elements to read.
|
||||
let mut reader = self.read();
|
||||
|
||||
// We need to trim the slices to keep only the satified elements
|
||||
|
||||
// First slice
|
||||
let mut first_kept = 0;
|
||||
// SAFETY:
|
||||
//
|
||||
// Only us can have a reference to these slices of the buffer
|
||||
for element in unsafe { &*reader.first.get() }
|
||||
{
|
||||
// SAFETY
|
||||
//
|
||||
// If this element is in a reader returned by self.read
|
||||
// with no pop called, we know it is initialized
|
||||
let init_element = unsafe { element.assume_init_ref() };
|
||||
let sat = predicate(init_element);
|
||||
if !sat
|
||||
{
|
||||
// Stop here
|
||||
// Forget about second slice
|
||||
reader.second_len = 0;
|
||||
reader.second = None;
|
||||
|
||||
// Trim first slice
|
||||
reader.first_len = first_kept;
|
||||
unsafe {
|
||||
reader.first = std::mem::transmute::<
|
||||
&[MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(&(&*reader.first.get())[0..first_kept]);
|
||||
}
|
||||
|
||||
return reader;
|
||||
}
|
||||
first_kept += 1;
|
||||
}
|
||||
|
||||
// If we are here, all of the elements of the first slice, satisfy the predicate
|
||||
|
||||
if let Some(second_slice) = &mut reader.second
|
||||
{
|
||||
// Second slice
|
||||
let mut second_kept = 0;
|
||||
// SAFETY:
|
||||
//
|
||||
// Only us can have a reference to these slices of the buffer
|
||||
for element in unsafe { &*second_slice.get() }
|
||||
{
|
||||
// SAFETY
|
||||
//
|
||||
// If this element is in a reader returned by self.read
|
||||
// with no pop called, we know it is initialized
|
||||
let init_element = unsafe { element.assume_init_ref() };
|
||||
let sat = predicate(init_element);
|
||||
if !sat
|
||||
{
|
||||
// Stop here
|
||||
// Trim second slice
|
||||
reader.second_len = second_kept;
|
||||
unsafe {
|
||||
reader.second = Some(std::mem::transmute::<
|
||||
&[MaybeUninit<T>],
|
||||
&UnsafeCell<[MaybeUninit<T>]>,
|
||||
>(
|
||||
&(&*second_slice.get())[0..first_kept]
|
||||
));
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
second_kept += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return reader;
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> StreamWriter<'a, T>
|
||||
{
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.first_len + self.second_len
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool
|
||||
{
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn next_index(&self) -> usize
|
||||
{
|
||||
self.start_index + self.written.get()
|
||||
}
|
||||
|
||||
pub fn push(&self, element: T) -> Result<(), T>
|
||||
{
|
||||
if self.written.get() < self.first_len
|
||||
{
|
||||
unsafe {
|
||||
(&mut *self.first.get())[self.written.get()] = MaybeUninit::new(element);
|
||||
}
|
||||
self.written.set(self.written.get() + 1);
|
||||
Ok(())
|
||||
}
|
||||
else if let Some(second) = &self.second
|
||||
&& self.written.get() - self.first_len < self.second_len
|
||||
{
|
||||
unsafe {
|
||||
(&mut *second.get())[self.written.get() - self.first_len] =
|
||||
MaybeUninit::new(element);
|
||||
}
|
||||
self.written.set(self.written.get() + 1);
|
||||
Ok(())
|
||||
}
|
||||
else
|
||||
{
|
||||
Err(element)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn write(&self, length: usize)
|
||||
{
|
||||
let new = self.written.get() + length;
|
||||
assert!(new < self.len());
|
||||
self.written.set(new);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Copy> StreamWriter<'a, T>
|
||||
{
|
||||
pub fn slices_mut(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
|
||||
{
|
||||
unsafe {
|
||||
(
|
||||
&mut *self.first.get(),
|
||||
self.second
|
||||
.map(|x| &mut *x.get())
|
||||
.unwrap_or_else(|| &mut(&mut *self.first.get())[0..0]),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> StreamReader<'a, T>
|
||||
{
|
||||
pub fn len(&self) -> usize
|
||||
{
|
||||
self.first_len + self.second_len
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool
|
||||
{
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn last_index(&self) -> usize
|
||||
{
|
||||
self.start_index + self.len()
|
||||
}
|
||||
|
||||
pub fn next_index(&self) -> usize
|
||||
{
|
||||
self.start_index + self.read.get()
|
||||
}
|
||||
|
||||
pub fn pop_with_index(&self) -> Option<(T, usize)>
|
||||
{
|
||||
let index = self.next_index();
|
||||
self.pop().map(|t| (t, index))
|
||||
}
|
||||
|
||||
pub fn peek<F, O>(&self, peeker: F) -> Option<O>
|
||||
where
|
||||
F: Fn(&T) -> O,
|
||||
{
|
||||
// Same as pop, without taking, or increasing read count
|
||||
if self.read.get() < self.first_len
|
||||
{
|
||||
// SAFETY:
|
||||
//
|
||||
// If element is in this slice, it is initialized.
|
||||
// We take it once since read increases
|
||||
let element = unsafe { (&mut *self.first.get())[self.read.get()].assume_init_ref() };
|
||||
Some(peeker(element))
|
||||
}
|
||||
else if let Some(second) = &self.second
|
||||
&& self.read.get() - self.first_len < self.second_len
|
||||
{
|
||||
let element =
|
||||
unsafe { (&mut *second.get())[self.read.get() - self.first_len].assume_init_ref() };
|
||||
Some(peeker(element))
|
||||
}
|
||||
else
|
||||
{
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pop(&self) -> Option<T>
|
||||
{
|
||||
if self.read.get() < self.first_len
|
||||
{
|
||||
// SAFETY:
|
||||
//
|
||||
// If element is in this slice, it is initialized.
|
||||
// We take it once since read increases
|
||||
let element = unsafe {
|
||||
std::mem::replace(
|
||||
&mut (&mut *self.first.get())[self.read.get()],
|
||||
MaybeUninit::uninit(),
|
||||
)
|
||||
.assume_init()
|
||||
};
|
||||
self.read.set(self.read.get() + 1);
|
||||
Some(element)
|
||||
}
|
||||
else if let Some(second) = &self.second
|
||||
&& self.read.get() - self.first_len < self.second_len
|
||||
{
|
||||
let element = unsafe {
|
||||
std::mem::replace(
|
||||
&mut (&mut *second.get())[self.read.get() - self.first_len],
|
||||
MaybeUninit::uninit(),
|
||||
)
|
||||
.assume_init()
|
||||
};
|
||||
self.read.set(self.read.get() + 1);
|
||||
Some(element)
|
||||
}
|
||||
else
|
||||
{
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read(&self, length: usize)
|
||||
{
|
||||
let new = self.read.get() + length;
|
||||
assert!(new < self.len());
|
||||
self.read.set(new);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Copy> StreamReader<'a, T>
|
||||
{
|
||||
pub fn slices(&self) -> (&[T], &[T])
|
||||
{
|
||||
unsafe {
|
||||
(
|
||||
std::mem::transmute::<&[MaybeUninit<T>], &[T]>(&*self.first.get()),
|
||||
std::mem::transmute::<&[MaybeUninit<T>], &[T]>(
|
||||
self.second
|
||||
.map(|x| &*x.get())
|
||||
.unwrap_or_else(|| &(&*self.first.get())[0..0]),
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When a Stream writer goes out of scope, it wrote
|
||||
// some things into the stream. These things need to be commited to the queue
|
||||
impl<'a, T> Drop for StreamWriter<'a, T>
|
||||
{
|
||||
fn drop(&mut self)
|
||||
{
|
||||
// Advance head.
|
||||
// We know that this value hasn't changed since this StreamWriter was created
|
||||
// let head = self.producer.inner.head.load(Ordering::Relaxed);
|
||||
|
||||
// We want writes to the buffer to be visible when acquired in the pop side
|
||||
self.producer
|
||||
.inner
|
||||
.head
|
||||
.store(self.start_index + self.written.get(), Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
// When a Stream reader goes out of scope, it took
|
||||
// some things from the stream. These things need to be de-commited to the queue
|
||||
impl<'a, T> Drop for StreamReader<'a, T>
|
||||
{
|
||||
fn drop(&mut self)
|
||||
{
|
||||
// Advance tail.
|
||||
// We know that this value hasn't changed since this StreamWriter was created
|
||||
// let tail = self.producer.inner.tail.load(Ordering::Relaxed);
|
||||
|
||||
// We want writes to the buffer to be visible when acquired in the push side
|
||||
self.producer
|
||||
.inner
|
||||
.tail
|
||||
.store(self.start_index + self.read.get(), Ordering::Release);
|
||||
}
|
||||
}
|
||||
// impl<T: Copy> StreamConsumer<T>
|
||||
// {
|
||||
// pub fn read(&mut self) -> (&[T], &[T])
|
||||
// {
|
||||
// let (slice_1, slice_2) = self.read_takable();
|
||||
// unsafe { (std::mem::transmute(slice_1), std::mem::transmute(slice_2)) }
|
||||
// }
|
||||
// }
|
||||
|
||||
mod test
|
||||
{
|
||||
#[allow(unused_imports)]
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use crate::stream::bounded_queue;
|
||||
|
||||
@ -738,57 +457,76 @@ mod test
|
||||
let (mut tx, mut rx) = bounded_queue::<usize>(4);
|
||||
|
||||
{
|
||||
let writer = tx.write();
|
||||
let mut writer = tx.write();
|
||||
let (a, b) = writer.slices_mut();
|
||||
assert_eq!(a.len(), 4);
|
||||
assert_eq!(b.len(), 0);
|
||||
|
||||
assert_eq!(writer.len(), 4);
|
||||
a[0] = MaybeUninit::new(0);
|
||||
a[1] = MaybeUninit::new(1);
|
||||
a[2] = MaybeUninit::new(2);
|
||||
a[3] = MaybeUninit::new(3);
|
||||
|
||||
assert_eq!(writer.push(1), Ok(()));
|
||||
assert_eq!(writer.push(2), Ok(()));
|
||||
assert_eq!(writer.push(3), Ok(()));
|
||||
assert_eq!(writer.push(4), Ok(()));
|
||||
assert_eq!(writer.push(5), Err(5));
|
||||
tx.produce(4);
|
||||
}
|
||||
|
||||
{
|
||||
let reader = rx.read();
|
||||
let mut reader = rx.read_takable();
|
||||
let (a, b) = reader.slices_mut();
|
||||
assert_eq!(a.len(), 4);
|
||||
assert_eq!(b.len(), 0);
|
||||
|
||||
assert_eq!(reader.len(), 4);
|
||||
unsafe {
|
||||
assert_eq!(a[0].take(), 0);
|
||||
assert_eq!(a[1].take(), 1);
|
||||
assert_eq!(a[2].take(), 2);
|
||||
assert_eq!(a[3].take(), 3);
|
||||
}
|
||||
|
||||
assert_eq!(reader.pop(), Some(1));
|
||||
assert_eq!(reader.pop(), Some(2));
|
||||
assert_eq!(reader.pop(), Some(3));
|
||||
assert_eq!(reader.pop(), Some(4));
|
||||
assert_eq!(reader.pop(), None);
|
||||
rx.consume(4);
|
||||
}
|
||||
|
||||
// Put stream into weird situatino
|
||||
// Put stream into weird situation
|
||||
{
|
||||
let writer = tx.write();
|
||||
assert_eq!(writer.push(1), Ok(()));
|
||||
assert_eq!(writer.push(2), Ok(()));
|
||||
assert_eq!(writer.push(3), Ok(()));
|
||||
assert_eq!(writer.push(4), Ok(()));
|
||||
let mut writer = tx.write();
|
||||
let (a, b) = writer.slices_mut();
|
||||
assert_eq!(a.len(), 4);
|
||||
assert_eq!(b.len(), 0);
|
||||
|
||||
a[0] = MaybeUninit::new(0);
|
||||
a[1] = MaybeUninit::new(1);
|
||||
a[2] = MaybeUninit::new(2);
|
||||
|
||||
tx.produce(3);
|
||||
}
|
||||
|
||||
{
|
||||
let reader = rx.read();
|
||||
assert_eq!(reader.pop(), Some(1));
|
||||
assert_eq!(reader.pop(), Some(2));
|
||||
let mut reader = rx.read_takable();
|
||||
let (a, b) = reader.slices_mut();
|
||||
assert_eq!(a.len(), 3);
|
||||
assert_eq!(b.len(), 0);
|
||||
|
||||
unsafe {
|
||||
assert_eq!(a[0].take(), 0);
|
||||
assert_eq!(a[1].take(), 1);
|
||||
assert_eq!(a[2].take(), 2);
|
||||
}
|
||||
|
||||
rx.consume(1);
|
||||
}
|
||||
|
||||
{
|
||||
let writer = tx.write();
|
||||
assert_eq!(writer.len(), 2);
|
||||
assert_eq!(writer.push(5), Ok(()));
|
||||
assert_eq!(writer.push(6), Ok(()));
|
||||
let (a, b) = writer.slices();
|
||||
assert_eq!(a.len(), 1);
|
||||
assert_eq!(b.len(), 1);
|
||||
}
|
||||
|
||||
{
|
||||
let reader = rx.read();
|
||||
assert_eq!(reader.pop(), Some(3));
|
||||
assert_eq!(reader.pop(), Some(4));
|
||||
assert_eq!(reader.pop(), Some(5));
|
||||
assert_eq!(reader.pop(), Some(6));
|
||||
let reader = rx.read_takable();
|
||||
let (a, b) = reader.slices();
|
||||
assert_eq!(a.len(), 2);
|
||||
assert_eq!(b.len(), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ impl Tag
|
||||
}
|
||||
|
||||
/// Creates a new tag, which is the combination of the given tags
|
||||
pub fn from_tags<const N: usize>(tag_opts: [&Tag; N]) -> Tag
|
||||
pub fn from_tags<const N: usize>(tag_opts: &[&Tag; N]) -> Tag
|
||||
{
|
||||
let new_tag = Self::default();
|
||||
{
|
||||
@ -197,24 +197,29 @@ impl Tag
|
||||
///
|
||||
/// If all the tag options are None, None is returned
|
||||
/// Otherwise it is Some of the combination of all of the tags which are Some
|
||||
pub fn from_tag_opts<const N: usize>(tag_opts: [&Option<Tag>; N]) -> Option<Tag>
|
||||
pub fn from_tag_opts<'a>(mut tag_opts: impl Iterator<Item = &'a Option<Tag>>) -> Option<Tag>
|
||||
{
|
||||
if tag_opts.iter().all(|t| t.is_none())
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let new_tag = Self::default();
|
||||
let mut some_tags = 0;
|
||||
{
|
||||
let mut writer = new_tag.data.write().unwrap();
|
||||
|
||||
for tag in tag_opts.iter().filter(|t| t.is_some())
|
||||
for tag in tag_opts.filter(|t| t.is_some())
|
||||
{
|
||||
some_tags += 1;
|
||||
let reader = tag.as_ref().unwrap().data.read().unwrap();
|
||||
writer.extend(reader.iter().map(|x| (*x.0, x.1.clone())));
|
||||
}
|
||||
}
|
||||
Some(new_tag)
|
||||
if some_tags > 0
|
||||
{
|
||||
Some(new_tag)
|
||||
}
|
||||
else
|
||||
{
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a new entry in the tag. If it already exists, it is overwritten
|
||||
@ -256,7 +261,7 @@ impl TagMergable<Tag> for Tag
|
||||
{
|
||||
fn merge(&self, other: &Self) -> Self
|
||||
{
|
||||
Self::from_tags([self, other])
|
||||
Self::from_tags(&[self, other])
|
||||
}
|
||||
}
|
||||
|
||||
@ -264,7 +269,7 @@ impl TagMergable<Option<Tag>> for Option<Tag>
|
||||
{
|
||||
fn merge(&self, other: &Self) -> Self
|
||||
{
|
||||
Tag::from_tag_opts([self, other])
|
||||
Tag::from_tag_opts([self, other].into_iter())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user