This commit is contained in:
2026-04-11 10:03:22 +02:00
parent 81cac2f239
commit 87921968b4
23 changed files with 1115 additions and 663 deletions

BIN
examples/qpsk-modem/mod.wav Normal file

Binary file not shown.

Binary file not shown.

View File

@ -1,61 +1,351 @@
use std::time::Instant;
use std::cell::RefCell;
use std::os::unix::thread;
use std::time::Duration;
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use cpal::traits::StreamTrait;
use egui::Color32;
use egui_plot::Line;
use egui_plot::PlotPoints;
use egui_plot::Points;
use num::Complex;
use num::complex::ComplexFloat;
use num::traits::sign;
use oxydsp_dsp::blocks::filtering::fir::FirFilter;
use oxydsp_dsp::blocks::filtering::pulse_shaping::PulseShaper;
use oxydsp_dsp::blocks::iq::zero_if::ZeroIf;
use oxydsp_dsp::blocks::math::basic::Multiplier;
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
use oxydsp_dsp::blocks::utilities::adapters::{Map, NullSink, Scan};
use oxydsp_dsp::blocks::utilities::adapters::Map;
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
use oxydsp_dsp::blocks::utilities::adapters::Scan;
use oxydsp_dsp::blocks::utilities::channels::RxSource;
use oxydsp_dsp::blocks::utilities::channels::TxSink;
use oxydsp_dsp::blocks::utilities::graph_control::GraphKiller;
use oxydsp_dsp::blocks::utilities::iter::IterSource;
use oxydsp_dsp::filtering::fir::Fir;
use oxydsp_dsp::filtering::history_buf::HistoryBuf;
use oxydsp_dsp::units::DigitalFrequency;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::flowgraph;
use rand::{RngExt, SeedableRng, random};
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::stream;
use oxydsp_flowgraph::tag::Tags;
use rand::random;
const CARRRIER_FREQ: f64 = 1000.;
const SAMPLE_RATE: usize = 48_000;
const SAMPLE_PER_SYMBOL: usize = 100;
fn main()
#[derive(BlockIO)]
pub struct CostasLoop
{
let bits = (0..1024).map(|_| [random::<bool>(), random::<bool>()]);
#[input]
input: In<f32>,
let (iter_source, bits) = IterSource::new(bits.cycle());
let (iq_map, iq) = Map::new(bits, |x| match x
#[output]
output: Out<Complex<f32>>,
center_freq: DigitalFrequency,
nco: oxydsp_dsp::synthesis::oscillator::Nco<f32>,
loop_filter: oxydsp_dsp::filtering::fir::FirFilter<f32, f32, f32>,
low_pass: oxydsp_dsp::filtering::fir::FirFilter<Complex<f32>, Complex<f32>, Complex<f32>>,
}
impl CostasLoop
{
pub fn new(
input: In<f32>,
start_frequency: DigitalFrequency,
loop_filter: Fir<f32>,
) -> (Self, In<Complex<f32>>)
{
let (output, iq) = oxydsp_flowgraph::io::stream();
(
CostasLoop {
input,
output,
center_freq: start_frequency,
nco: start_frequency.into(),
loop_filter: oxydsp_dsp::filtering::fir::FirFilter::new(loop_filter),
low_pass: oxydsp_dsp::filtering::fir::FirFilter::new(
Fir::lowpass(start_frequency, 100).normalized_len(),
),
},
iq,
)
}
}
impl Block for CostasLoop
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(self.input.iter().map(|x| {
let (signal, tag) = x.into();
let lo = self.nco.next().unwrap();
let iq = self
.low_pass
.next(Complex::new(lo.re * signal, lo.im * signal));
let error = iq.re * iq.im.signum() - iq.im * iq.re.signum();
let correction = self.loop_filter.next(error);
self.nco.set_frequency(DigitalFrequency::from_rad(
self.center_freq.as_rad() + correction as f64,
));
(iq, tag).into()
}));
oxydsp_flowgraph::block::BlockResult::Ok
}
}
pub fn main()
{
demodulator();
//modulator();
}
pub fn modulator()
{
let bit_source = (0..8192).map(|_| [random::<bool>(), random::<bool>()]);
let mut tags = Tags::default();
let (mut iter_source, bits) = IterSource::new(bit_source);
let kill_tag = tags.allocate_tag("Kill tag");
iter_source.tag_last_with(kill_tag.clone());
let (to_iq, iq) = Map::new(bits, |x| match x
{
[true, true] => Complex::new(1., 1.),
[true, false] => Complex::new(1., -1.),
[false, true] => Complex::new(-1., 1.),
[false, false] => Complex::new(-1., -1.),
});
let (pulse_shaper, iq) = PulseShaper::new(iq, Fir::square(200), 200);
let (lo, carrier) = OscillatorSource::new(DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into());
let (pulse_shaper, iq) = PulseShaper::new(
iq,
Fir::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 0.5, SAMPLE_PER_SYMBOL),
SAMPLE_PER_SYMBOL,
);
let (carrier_oscillator, carrier) = OscillatorSource::new(
DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into(),
);
let (mixer, passband) = Multiplier::new(iq, carrier);
let (channel, passband) = Scan::new(passband, rand::rngs::SmallRng::seed_from_u64(0), |state, x|
let (tx, rx) = std::sync::mpsc::channel::<Complex<f32>>();
let (graph_killer, passband) = GraphKiller::new(passband, kill_tag);
let tx_sink = TxSink::new(passband, tx);
let graph = flowgraph![
iter_source,
to_iq,
pulse_shaper,
carrier_oscillator,
mixer,
graph_killer,
tx_sink
];
graph.run(1);
let spec = hound::WavSpec {
channels: 1,
sample_rate: SAMPLE_RATE as u32,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap();
for iq in rx.iter()
{
x.re + state.sample::<f32, _>(rand_distr::StandardNormal)
});
let amplitude = 0.5 * i16::MAX as f32;
writer.write_sample((iq.re * amplitude) as i16).unwrap();
}
writer.finalize().unwrap();
}
let (zero_if, iq) = ZeroIf::new(passband, DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64).into());
let (matched_filter, iq) = FirFilter::new(iq, Fir::<f32>::square(200));
let (inspect, iq) = Scan::new(iq, (Instant::now(), 0), |(last, counter), x|
pub fn demodulator()
{
let (audio_tx, audio_rx) = std::sync::mpsc::channel();
let (rx_source, signal) = RxSource::new(audio_rx);
let (downconverter, iq) = CostasLoop::new(
signal,
DigitalFrequency::from_time_frequency(CARRRIER_FREQ, SAMPLE_RATE as f64),
Fir::proportional_integral(100, 0.000, 0.0000),
);
//let agc_filter = Fir::proportional_integral(100, 0.1, 0.001);
// let (agc, iq) = Scan::new(iq, 1., |gain, iq|
// {
// let mu = 0.1;
// let mag = iq.abs();
// *gain += mu * (1. - mag * *gain);
//
// iq * *gain
// });
let (matched_filter, iq) = FirFilter::new(
iq,
Fir::<f32>::root_raised_cosine(4 * SAMPLE_PER_SYMBOL, 0.5, SAMPLE_PER_SYMBOL)
.normalized_sqr(),
);
let (eye_i_tx, eye_i_rx) = std::sync::mpsc::channel::<Vec<f32>>();
let (eye_q_tx, eye_q_rx) = std::sync::mpsc::channel::<Vec<f32>>();
let (constellation_tx, constellation_rx) = std::sync::mpsc::channel();
// let (debug, iq) = Scan::new(
// iq,
// (
// HistoryBuf::new(0., SAMPLE_PER_SYMBOL * 2),
// HistoryBuf::new(0., SAMPLE_PER_SYMBOL * 2),
// 0usize,
// ),
// move |(buf_i, buf_q, counter), x| {
// buf_i.push(x.re);
// buf_q.push(x.im);
// let _ = constellation_tx.send(x);
// *counter += 1;
// if *counter >= SAMPLE_PER_SYMBOL * 2
// {
// let _ = eye_i_tx.send(buf_i.as_slice().iter().copied().collect::<Vec<_>>());
// let _ = eye_q_tx.send(buf_q.as_slice().iter().copied().collect::<Vec<_>>());
// *counter = 0;
// }
// x
// },
// );
let tx_sink = TxSink::new(iq, constellation_tx);
let host = cpal::default_host();
let device = host
.default_input_device()
.expect("no output device available");
let mut supported_configs_range = device
.supported_input_configs()
.expect("error while querying configs");
let supported_config = supported_configs_range
.next()
.expect("no supported config?!")
.with_sample_rate(SAMPLE_RATE as u32);
let _stream = device
.build_input_stream(
&supported_config.into(),
move |data: &[f32], _: &cpal::InputCallbackInfo| {
for x in data
{
//let _ = audio_tx.send(*x * 10.);
let _ = audio_tx.send(random::<f32>());
//let _ = audio_tx.send(Complex::new(random::<f32>(), random::<f32>()));
}
},
move |_err| {},
None, // None=blocking, Some(Duration)=timeout
)
.unwrap();
let graph = flowgraph![
rx_source,
downconverter,
// agc,
matched_filter,
//debug,
//null_sink
tx_sink
];
graph.run(6);
let mut eye_i_history = HistoryBuf::new(vec![], 200);
let mut eye_q_history = HistoryBuf::new(vec![], 200);
let mut constellation = HistoryBuf::new(Complex::new(0., 0.), 5000);
eframe::run_simple_native("Window", Default::default(), move |ctx, _frame| {
for eye in eye_i_rx.try_iter().take(200)
{
*counter += 1;
if *counter >= 1_000_000
{
let time = Instant::now() - *last;
println!("{:.2} Ms/s", 1. / time.as_secs_f32());
*last = Instant::now();
*counter = 0;
}
x
});
let null_sink = NullSink::new(iq);
eye_i_history.push(eye);
}
for eye in eye_q_rx.try_iter().take(200)
{
eye_q_history.push(eye);
}
for point in constellation_rx.try_iter().take(5000)
{
constellation.push(point);
}
let graph = flowgraph![iter_source, iq_map, pulse_shaper, lo, mixer, channel, zero_if, matched_filter, inspect, null_sink];
graph.run(6).join();
egui::CentralPanel::default().show(ctx, |ui| {
egui_plot::Plot::new("plot")
.data_aspect(1.)
.show(ui, |plot_ui| {
plot_ui.points(
Points::new(
"Constellation",
constellation
.as_slice()
.iter()
.map(|point| [point.re as f64, point.im as f64])
.collect::<PlotPoints>(),
)
.color(Color32::YELLOW.gamma_multiply_u8(70)),
);
for (eye_i, eye_q) in eye_i_history
.as_slice()
.iter()
.zip(eye_q_history.as_slice().iter())
{
plot_ui.line(
Line::new(
"In-Phase",
eye_i
.iter()
.enumerate()
.map(|(i, x)| {
[i as f64 / (SAMPLE_PER_SYMBOL as f64 * 2.) + 1., *x as f64]
})
.collect::<Vec<_>>(),
)
.color(Color32::RED),
);
plot_ui.line(
Line::new(
"Quadrature",
eye_q
.iter()
.enumerate()
.map(|(i, x)| {
[*x as f64, i as f64 / (SAMPLE_PER_SYMBOL as f64 * 2.) - 2.]
})
.collect::<Vec<_>>(),
)
.color(Color32::GREEN),
);
plot_ui.points(
Points::new(
"Constellation",
eye_i
.iter()
.zip(eye_q.iter())
.skip(SAMPLE_PER_SYMBOL / 2)
.step_by(SAMPLE_PER_SYMBOL)
.map(|(i, q)| [*i as f64, *q as f64])
.collect::<PlotPoints>(),
)
.color(Color32::GREEN)
.radius(1.5),
);
}
});
});
ctx.request_repaint();
})
.unwrap();
}
pub fn to_bits(n: u8) -> [bool; 8]

View File

@ -2,11 +2,8 @@ use num::Zero;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::sync_block;
use std::iter::Sum;
use std::ops::Add;
use std::ops::Mul;
@ -57,7 +54,7 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(self.input.pop_iter().map(|x| (self.filter.next(x.0), x.1).into()));
self.output.push_iter(self.input.iter().map(|x| (self.filter.next(x.0), x.1).into()));
BlockResult::Ok
}
}

View File

@ -47,14 +47,14 @@ impl<T: 'static + std::ops::Mul<Output = T> + std::iter::Sum + std::clone::Clone
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let reader = self.input.read();
let writer = self.output.write();
let mut reader = self.input.iter();
let mut writer = self.output.write_push();
for _ in 0..writer.len()
{
if self.remaining == 0
{
if let Some(input) = reader.pop()
if let Some(input) = reader.next()
{
let (data, tag) = input.into();
let _ = writer.push((self.pulse_shaper.next(data), tag).into());

View File

@ -1,12 +1,11 @@
use std::fmt::Debug;
use num::Complex;
use num::Float;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::sync_block;
use rustfft::FftNum;
use crate::filtering::fir::Fir;
@ -62,7 +61,7 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(self.input.pop_iter().map(|input| {
self.output.push_iter(self.input.iter().map(|input| {
let (data, tag) = input.into();
// Mix
let lo_sample = self.local_oscillator.next().unwrap();

View File

@ -4,15 +4,11 @@ use std::ops::Mul;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::TagMergable;
use oxydsp_flowgraph::tag::Tag;
#[derive(BlockIO)]
#[sync_block]
pub struct Adder<Ia, Ib, O>
where
Ia: Add<Ib, Output = O> + 'static,
@ -49,35 +45,26 @@ where
}
}
impl<'view, Ia, Ib, O> SyncBlock<'view> for Adder<Ia, Ib, O>
impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
where
Ia: Add<Ib, Output = O> + 'static,
Ib: 'static,
O: 'static,
{
fn sync_work(_state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> BlockResult
{
Some(input.0 + input.1)
self.output.push_iter(
self.input_a
.iter()
.zip(self.input_b.iter())
.map(|(x, y)| {
(x.0 + y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into()
})
);
BlockResult::Ok
}
}
// impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
// where
// Ia: Add<Ib, Output = O> + 'static,
// Ib: 'static,
// O: 'static,
// {
// fn work(&mut self) -> BlockResult
// {
// self.output.push_iter(
// (&mut self.input_a, &mut self.input_b)
// .pop_iter()
// .map(|(a, b)| (a.0 + b.0, a.1.merge(&b.1))),
// );
// BlockResult::Ok
// }
// }
#[derive(BlockIO)]
pub struct Multiplier<Ia, Ib, O>
where
@ -124,9 +111,12 @@ where
fn work(&mut self) -> BlockResult
{
self.output.push_iter(
(&mut self.input_a, &mut self.input_b)
.pop_iter()
.map(|(a, b)| (a.0 * b.0, a.1.merge(&b.1)).into()),
self.input_a
.iter()
.zip(self.input_b.iter())
.map(|(x, y)| {
(x.0 * y.0, Tag::from_tag_opts(&[&x.1, &y.1])).into()
})
);
BlockResult::Ok
}

View File

@ -6,7 +6,6 @@ use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
#[derive(BlockIO)]
@ -81,7 +80,7 @@ impl<T: Float + From<f32> + 'static> Block for Nco<T>
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output
.push_iter(&mut self.frequency.pop_iter().map(|f| {
.push_iter(&mut self.frequency.iter().map(|f| {
self.nco.set_frequency(f.0);
(self.nco.next().unwrap(), f.1).into()
}));

View File

@ -1,21 +1,19 @@
use std::collections::VecDeque;
use std::iter::Sum;
use num::Float;
use num::NumCast;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
use oxydsp_flowgraph::tag::TagKey;
use crate::filtering::fir::Fir;
use crate::filtering::fir::FirFilter;
use crate::filtering::history_buf::HistoryBuf;
#[derive(BlockIO)]
#[sync_block(tagged)]
pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'static>
{
#[input]
@ -27,7 +25,7 @@ pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'stati
symbol_length: usize,
// Window looking at symbol_length samples at a time
window: VecDeque<T>,
window: HistoryBuf<T>,
// The current location of the window, in relation to the last sample
window_location: usize,
@ -44,7 +42,7 @@ pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'stati
impl<T> EarlyLateGate<T>
where
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
T: Float + Sum + Clone + 'static + Send + Sync + NumCast + Default,
{
pub fn new(
input: In<T>,
@ -58,7 +56,7 @@ where
Self {
input,
output,
window: VecDeque::with_capacity(symbol_length),
window: HistoryBuf::new(Default::default(), symbol_length),
symbol_length,
window_location: 0,
window_center: symbol_length / 2,
@ -72,48 +70,44 @@ where
}
}
impl<'view, T> SyncBlock<'view> for EarlyLateGate<T>
impl<'view, T> Block for EarlyLateGate<T>
where
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
if state.window.len() < *state.symbol_length
{
state.window.push_back(input.0);
*state.window_location += 1;
return Some(input.0.into());
}
self.output.push_iter(self.input.iter().map(|input| {
// Bring new sample in
self.window.push(input.0);
self.window_location += 1;
// Bring new sample in
state.window.pop_front();
state.window.push_back(input.0);
*state.window_location += 1;
let sample = self.window.as_slice()[self.window_center];
let mut tag = None;
if self.window_location >= self.next_sample as usize
{
let early_index = self.window_center - (0.25 * self.symbol_length as f32) as usize;
let late_index = self.window_center + (0.25 * self.symbol_length as f32) as usize;
let sample = state.window[*state.window_center];
let mut tag = None;
if *state.window_location >= *state.next_sample as usize
{
let early_index = *state.window_center - (0.25 * *state.symbol_length as f32) as usize;
let late_index = *state.window_center + (0.25 * *state.symbol_length as f32) as usize;
let early_sample = self.window.as_slice()[early_index];
let late_sample = self.window.as_slice()[late_index];
let early_sample = state.window[early_index];
let late_sample = state.window[late_index];
let error = (late_sample - early_sample) * sample;
let correction = self.loop_filter.next(error);
let error = (late_sample - early_sample) * sample;
let correction = state.loop_filter.next(error);
// Figure out next sample location
self.next_sample +=
(self.symbol_length as f32 + correction.to_f32().unwrap()).max(0.);
// Figure out next sample location
*state.next_sample +=
(*state.symbol_length as f32 + correction.to_f32().unwrap()).max(0.);
// Turn everything back relative to current sample
self.next_sample -= self.window_location as f32;
self.window_location = 0;
// Turn everything back relative to current sample
*state.next_sample -= *state.window_location as f32;
*state.window_location = 0;
tag = Some(Tag::with_entry(self.symbol_tag_key.clone(), error));
}
tag = Some(Tag::with_entry(state.symbol_tag_key.clone(), error));
}
(sample, tag).into()
}));
Some((sample, tag).into())
oxydsp_flowgraph::block::BlockResult::Ok
}
}

View File

@ -2,3 +2,4 @@ pub mod adapters;
pub mod channels;
pub mod iter;
pub mod squelch;
pub mod graph_control;

View File

@ -3,12 +3,9 @@ use std::iter::FusedIterator;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
use oxydsp_flowgraph::tag::TagMergable;
use oxydsp_flowgraph::tag::Tagged;
@ -44,7 +41,7 @@ where
{
self.output.push_iter(
self.input
.pop_iter()
.iter()
.map(|x| ((&self.map)(x.0), x.1).into()),
);
BlockResult::Ok
@ -80,12 +77,12 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
for _ in 0..(writer.len().min(reader.len()))
{
let (input, tag_opt) = reader.pop().unwrap().into();
let (input, tag_opt) = reader.next().unwrap().into();
let (output, result) = (self.map)(input);
let _ = writer.push((output, tag_opt).into());
match result
@ -131,12 +128,12 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
for _ in 0..(writer.len().min(reader.len()))
{
let (input, tag_opt) = reader.pop().unwrap().into();
let (input, tag_opt) = reader.next().unwrap().into();
let (tagged_out, result) = (self.map)((input, tag_opt.clone()).into());
let (output, tag_out) = tagged_out.into();
@ -196,15 +193,18 @@ where
O: 'static,
F: Fn(&mut S, I) -> O,
{
fn work(&mut self) -> BlockResult {
self.output.push_iter(self.input.pop_iter()
.map(|x| ((self.map)(&mut self.state, x.0), x.1).into()));
fn work(&mut self) -> BlockResult
{
self.output.push_iter(
self.input
.iter()
.map(|x| ((self.map)(&mut self.state, x.0), x.1).into()),
);
BlockResult::Ok
}
}
#[derive(BlockIO)]
#[sync_block(tagged)]
pub struct ScanTagged<I: 'static, O: 'static, S, F>
where
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
@ -239,16 +239,20 @@ where
}
}
impl<'view, I, O, S, F> SyncBlock<'view> for ScanTagged<I, O, S, F>
impl<I, O, S, F> Block for ScanTagged<I, O, S, F>
where
I: 'static,
O: 'static,
S: 'view,
F: Fn(&mut S, Tagged<I>) -> Tagged<O> + 'view,
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> BlockResult
{
Some((*state.map)(state.state, input))
self.output.push_iter(self.input.iter().map(|tagged| {
let cloned_tag = tagged.1.clone();
let (output, tag) = (self.map)(&mut self.state, (tagged.0, cloned_tag).into()).into();
(output, Tag::from_tag_opts(&[&tagged.1, &tag])).into()
}));
BlockResult::Ok
}
}
@ -288,15 +292,15 @@ impl<T: Clone + 'static> Block for Repeat<T>
{
fn work(&mut self) -> BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
let len = writer.len().min(reader.len() / self.repetitions + 1);
for _ in 0..len
{
if self.remaining == 0 || self.current.is_none()
{
if let Some(x) = reader.pop()
if let Some(x) = reader.next()
{
self.current = Some(x.into());
self.remaining = self.repetitions;
@ -324,7 +328,6 @@ impl<T: Clone + 'static> Block for Repeat<T>
}
#[derive(BlockIO)]
#[sync_block]
pub struct NullSink<T: 'static>
{
#[input]
@ -339,12 +342,12 @@ impl<T: 'static> NullSink<T>
}
}
impl<'view, I: 'static> SyncBlock<'view> for NullSink<I>
impl<I: 'static> Block for NullSink<I>
{
fn sync_work(_: Self::StateView, _: Self::Input) -> Option<Self::Output>
fn work(&mut self) -> BlockResult
{
// Don't do shit !
Some(())
self.input.iter().for_each(|_| ());
BlockResult::Ok
}
}
@ -383,12 +386,9 @@ impl<T: 'static + Clone> Block for Tee<T>
{
fn work(&mut self) -> BlockResult
{
let writer_a = self.output_a.write();
let writer_b = self.output_b.write();
for x in self
.input
.pop_iter()
.take(writer_a.len().min(writer_b.len()))
let mut writer_a = self.output_a.write_push();
let mut writer_b = self.output_b.write_push();
for x in self.input.iter().take(writer_a.len().min(writer_b.len()))
{
let _ = writer_a.push(x.clone());
let _ = writer_b.push(x);
@ -446,8 +446,8 @@ where
{
fn work(&mut self) -> BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let mut writer = self.output.write_push();
let mut reader = self.input.iter();
let max_write = writer.len();
let mut written = 0;
@ -472,7 +472,7 @@ where
if self.current_iter.is_none()
{
// Get input
if let Some(input) = reader.pop()
if let Some(input) = reader.next()
{
let mut new_iter = (self.map)(input.0).into_iter();
if let Some(first_elt) = new_iter.next()

View File

@ -7,7 +7,6 @@ use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
#[derive(BlockIO)]
@ -49,16 +48,10 @@ impl<I: 'static> Block for RxSource<Receiver<I>, I>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
if self
self
.output
.push_iter(self.input.try_iter().map(|x| (x, None).into()))
{
BlockResult::Ok
}
else
{
BlockResult::Terminated
}
.push_iter(self.input.try_iter().map(|x| (x, None).into()));
BlockResult::Ok
}
}
@ -68,7 +61,7 @@ impl<I: 'static> Block for TxSink<Sender<I>, I>
{
if self
.input
.pop_iter()
.iter()
.map(|x| self.output.send(x.0))
.any(|res| res.is_err())
{
@ -87,7 +80,7 @@ impl<I: 'static> Block for TxSink<SyncSender<I>, I>
{
if self
.input
.pop_iter()
.iter()
.map(|x| self.output.send(x.0))
.any(|res| res.is_err())
{

View File

@ -0,0 +1,53 @@
use oxydsp_flowgraph::{BlockIO, block::Block, io::{In, Out}, tag::TagKey};
#[derive(BlockIO)]
pub struct GraphKiller<T: 'static, K: Send>
{
#[input]
input: In<T>,
#[output]
output: Out<T>,
kill_tag: TagKey<K>
}
impl<T: 'static, K: Send + Send + 'static> GraphKiller<T, K>
{
pub fn new(input: In<T>, kill_on: TagKey<K>) -> (Self, In<T>)
{
let (output, port) = oxydsp_flowgraph::io::stream();
(
GraphKiller
{
input,
output,
kill_tag: kill_on,
},
port
)
}
}
impl<T: 'static, K: Send + Sync + 'static> Block for GraphKiller<T, K>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let mut reader = self.input.iter();
let mut writer = self.output.write_push();
for _ in 0..reader.len().min(writer.len())
{
let (data, tag) = reader.next().unwrap().into();
if tag.as_ref().is_some_and(|t| t.retrieve(&self.kill_tag).is_some())
{
return oxydsp_flowgraph::block::BlockResult::Exit;
}
let _ = writer.push((data, tag).into());
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}

View File

@ -52,7 +52,7 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let mut writer = self.output.write_push();
for _ in 0..writer.len()
{

View File

@ -11,7 +11,6 @@ use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
#[derive(BlockIO)]
pub struct Squelch<T>
@ -61,8 +60,8 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
for x in self.input.pop_iter().take(writer.len())
let mut writer = self.output.write_push();
for x in self.input.iter().take(writer.len())
{
let (element, tag) = x.into();

View File

@ -6,8 +6,6 @@ use num::Zero;
use num::complex::ComplexFloat;
use rustfft::FftNum;
use rustfft::FftPlanner;
use std::array;
use std::collections::VecDeque;
use std::f64::consts::PI;
use std::iter::Sum;
use std::ops::Add;

View File

@ -56,7 +56,7 @@ fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousIn>
fn get_inputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousIn>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockInput;
@ -67,7 +67,7 @@ fn block_io_get_inputs(fields: zyn::syn::Fields) -> zyn::TokenStream
acc
}
fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousIn>
fn get_inputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousIn>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockInput;
@ -85,7 +85,7 @@ fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream
{
let fields = fields.as_named().unwrap().named.clone();
zyn::zyn!(
fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::AnonymousOut>
fn get_outputs_mut(&mut self) -> Vec<&mut dyn oxydsp_flowgraph::io::edge::AnonymousOut>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockOutput;
@ -96,7 +96,7 @@ fn block_io_get_outputs(fields: zyn::syn::Fields) -> zyn::TokenStream
acc
}
fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::AnonymousOut>
fn get_outputs(&self) -> Vec<&dyn oxydsp_flowgraph::io::edge::AnonymousOut>
{
let mut acc = vec![];
use oxydsp_flowgraph::block::BlockOutput;

View File

@ -1,5 +1,5 @@
use crate::io::AnonymousIn;
use crate::io::AnonymousOut;
use crate::io::edge::AnonymousIn;
use crate::io::edge::AnonymousOut;
use crate::io::In;
use crate::io::Out;
use crate::io::edge::BlockIOIndex;

View File

@ -5,7 +5,6 @@ use std::thread::JoinHandle;
use crossbeam_deque::Steal;
use crossbeam_deque::Worker;
use crate::block;
use crate::block::GraphableBlock;
use crate::io::edge::BlockIOIndex;

View File

@ -1,17 +1,17 @@
use std::any::Any;
use std::mem::ManuallyDrop;
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::sync::Mutex;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::{self};
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::tag::TagSlot;
use crate::tag::Tagged;
pub mod edge;
use crate::io::edge::BlockIOIndex;
use crate::io::edge::Edge;
/// Represents a input port for a block
@ -34,111 +34,276 @@ pub struct Out<T>
edge: Arc<Mutex<Edge>>,
}
/// Trait to manipulate a block's input in a type agnostic/erased way
pub trait AnonymousIn
// Input Output interfaces
/// Output interface to write elements in a "push" fashion
pub struct OutPush<'a, T>
{
/// Inform the input about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
data_writer: ManuallyDrop<StreamWriter<'a, T>>,
tag_writer: ManuallyDrop<StreamWriter<'a, TagSlot>>,
/// Returns None or the block index of the block, and the block port of the corresponding
/// Out object
fn get_producer_block(&self) -> Option<BlockIOIndex>;
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer);
total_length: usize,
written_data: usize,
written_tags: usize,
start_index: usize,
}
/// Trait to manipulate a block's output in a type agnostic/erased way
pub trait AnonymousOut
impl<'a, T> OutPush<'a, T>
{
/// Inform the output about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
pub fn len(&self) -> usize
{
let data_slices = self.data_writer.slices();
// This gives better performance !
// Probably because it prevents claiming the whole buffer at once
// But this is hacky. It should probably be managed at a lower level
// ((data_slices.0.len() + data_slices.1.len()) / 2) - self.written_data
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer);
data_slices.0.len() + data_slices.1.len() - self.written_data
}
/// Returns None or the block index of the block, and the block port of the corresponding
/// In object
fn get_consumer_block(&self) -> Option<BlockIOIndex>;
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
/// Creates the stream with the correct corresponding type, in a type erased way.
pub fn push(&mut self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
if self.written_data >= self.total_length
{
return Err(data);
}
let data_slices = self.data_writer.slices_mut();
let tag_slices = self.tag_writer.slices_mut();
// Write a data
let data_ref = {
if self.written_data < data_slices.0.len()
{
&mut data_slices.0[self.written_data]
}
else
{
&mut data_slices.1[self.written_data - data_slices.0.len()]
}
};
// Index of the taken element within the stream.
*data_ref = MaybeUninit::new(data.0);
let element_index = self.start_index + self.written_data;
self.written_data += 1;
// Check for corresponding tag
let tag_ref = {
if self.written_tags < tag_slices.0.len()
{
&mut tag_slices.0[self.written_tags]
}
else
{
&mut tag_slices.1[self.written_tags - tag_slices.0.len()]
}
};
if let Some(tag) = data.1
{
*tag_ref = MaybeUninit::new(TagSlot {
position: element_index,
tag,
});
self.written_tags += 1;
}
Ok(())
}
}
impl<'a, T> Drop for OutPush<'a, T>
{
fn drop(&mut self)
{
let data_writer =
unsafe { ManuallyDrop::<StreamWriter<'a, T>>::take(&mut self.data_writer) };
let tag_writer =
unsafe { ManuallyDrop::<StreamWriter<'a, TagSlot>>::take(&mut self.tag_writer) };
tag_writer.produce(self.written_tags);
data_writer.produce(self.written_data);
}
}
pub struct InIter<'a, T>
{
data_reader: ManuallyDrop<StreamReader<'a, T>>,
tag_reader: ManuallyDrop<StreamReader<'a, TagSlot>>,
total_length: usize,
total_tag_length: usize,
read_data: usize,
read_tags: usize,
start_index: usize,
}
impl<'a, T> Iterator for InIter<'a, T>
{
type Item = Tagged<T>;
fn next(&mut self) -> Option<Self::Item>
{
if self.read_data >= self.total_length
{
return None;
}
let data_slices = self.data_reader.slices_mut();
let tag_slices = self.tag_reader.slices_mut();
// Take a data
// SAFETY:
// All takable should contain a valid element: guarteed by the queue
// We strictly monotonicly take all elements, in order, in the slices.
// No two same indices should be taken.
// We cound the number of taken elemnts and consume the correct amount from the queue
let data = {
if self.read_data < data_slices.0.len()
{
unsafe { data_slices.0[self.read_data].take() }
}
else
{
unsafe { data_slices.1[self.read_data - data_slices.0.len()].take() }
}
};
// Index of the taken element within the stream.
let element_index = self.start_index + self.read_data;
self.read_data += 1;
// Check for corresponding tag
let mut tag = None;
if self.read_tags < self.total_tag_length
{
let tag_ref = {
if self.read_tags < tag_slices.0.len()
{
&mut tag_slices.0[self.read_tags]
}
else
{
&mut tag_slices.1[self.read_tags - tag_slices.0.len()]
}
};
// SAFETY:
// Same as before : strictly monotic access in the tag slices
if unsafe { tag_ref.peek().position == element_index }
{
// The next tag in line is tagging the just-poped element.
// We get it
tag = Some(unsafe { tag_ref.take().tag });
self.read_tags += 1;
}
}
Some(Tagged::new(data, tag))
}
fn size_hint(&self) -> (usize, Option<usize>)
{
let len =
self.data_reader.slices().0.len() + self.data_reader.slices().1.len() - self.read_data;
(len, Some(len))
}
}
impl<'a, T> Drop for InIter<'a, T>
{
fn drop(&mut self)
{
let data_reader =
unsafe { ManuallyDrop::<StreamReader<'a, T>>::take(&mut self.data_reader) };
let tag_reader =
unsafe { ManuallyDrop::<StreamReader<'a, TagSlot>>::take(&mut self.tag_reader) };
tag_reader.consume(self.read_tags);
data_reader.consume(self.read_data);
}
}
impl<'a, T> ExactSizeIterator for InIter<'a, T> {}
impl<T: 'static> In<T>
{
pub fn iter<'a>(&'a mut self) -> InIter<'a, T>
{
let first_index = self.stream.as_ref().unwrap().first_index();
let data_reader = self.stream.as_mut().unwrap().read_takable();
let total_length = data_reader.slices().0.len() + data_reader.slices().1.len();
let tag_reader = self.tag_stream.as_mut().unwrap().read_takable();
let total_tag_length = tag_reader.slices().0.len() + tag_reader.slices().1.len();
InIter {
data_reader: ManuallyDrop::new(data_reader),
tag_reader: ManuallyDrop::new(tag_reader),
read_data: 0,
read_tags: 0,
total_length,
total_tag_length,
start_index: first_index,
}
}
}
impl<T: 'static> Out<T>
{
pub fn write_push<'a>(&'a mut self) -> OutPush<'a, T>
{
let first_index = self.stream.as_ref().unwrap().first_index();
let data_writer = self.stream.as_mut().unwrap().write();
let total_length = data_writer.slices().0.len() + data_writer.slices().1.len();
let tag_writer = self.tag_stream.as_mut().unwrap().write();
OutPush {
data_writer: ManuallyDrop::new(data_writer),
tag_writer: ManuallyDrop::new(tag_writer),
total_length,
written_data: 0,
written_tags: 0,
start_index: first_index,
}
}
/// Pushes an iterator to the output, sending the maximum amount of elements
/// to the output.
///
/// This delegation of stream creation is necessary to allow the graph to manipulate
/// it, as it cannot know about the generic type of the stream.
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
}
impl<T: 'static> AnonymousIn for In<T>
{
fn set_index(&self, index: BlockIOIndex)
/// It will not consume the iterator more than what can be sent.
///
/// ```
/// let writer = output.write();
///
/// // Send only 42s to the output
/// writer.push_iter(std::iter::repeat(42));
/// ```
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
{
self.edge.lock().unwrap().to = Some(index);
let mut pusher = self.write_push();
let mut len = pusher.len();
while len > 0
{
len -= 1;
match iter.next()
{
Some(element) => {let _ = pusher.push(element); },
None => return false,
}
}
return false;
}
fn get_producer_block(&self) -> Option<BlockIOIndex>
/// Meta information
/// Returns a string of the type of the output
pub fn get_type_name(&self) -> &'static str
{
self.edge.lock().unwrap().from
std::any::type_name::<T>()
}
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
}
impl<T: 'static> AnonymousOut for Out<T>
{
fn set_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
// which knows the stream type
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
}
/// A Reader to get data from an input
pub struct InReader<'a, T>
{
data_slice_1: &'a mut [MaybeUninit<T>],
data_slice_2: &'a mut [MaybeUninit<T>],
// data_reader: StreamReader<'a, T>,
// tag_reader: StreamReader<'a, TagSlot>,
}
/// A writer to send data to an output
pub struct OutWriter<'a, T>
{
data_slice_1: &'a mut [MaybeUninit<T>],
data_slice_2: &'a mut [MaybeUninit<T>],
// data_writer: StreamWriter<'a, T>,
// tag_writer: StreamWriter<'a, TagSlot>,
}
/// Creates a stream that can then be used to link blocks
@ -168,207 +333,50 @@ pub fn stream<T>() -> (Out<T>, In<T>)
)
}
impl<T: 'static> In<T>
pub fn streams<T, const N: usize>() -> ([Out<T>; N], [In<T>; N])
{
/// Gets a reader view from an input.
///
/// ```
/// let reader = input.read();
/// let data = reader.pop();
/// ```
pub fn read<'a>(&'a mut self) -> InReader
{
// let data_reader = self.stream.as_mut().unwrap().read();
// let tag_reader = self.tag_stream.as_mut().unwrap().read();
InReader {
// data_reader,
// tag_reader,
}
}
// Ugly simultanous initialization
let mut ins: [_; N] = std::array::from_fn(|_| None);
let mut outs: [_; N] = std::array::from_fn(|_| None);
ins.iter_mut()
.zip(outs.iter_mut())
.for_each(|(input, output)| {
let (newout, newin) = stream();
*input = Some(newin);
*output = Some(newout);
});
let ins_some: [_; N] = std::array::from_fn(|i| ins[i].take().unwrap());
let outs_some: [_; N] = std::array::from_fn(|i| outs[i].take().unwrap());
(outs_some, ins_some)
}
impl<T: 'static> Out<T>
{
/// Gets a reader view from an output.
///
/// ```
/// let writer = output.write();
/// writer.push((data, tag).into());
/// ```
pub fn write<'a>(&'a mut self) -> OutWriter
{
OutWriter {
// data_writer: self.stream.as_mut().unwrap().write(),
// tag_writer: self.tag_stream.as_mut().unwrap().write(),
}
}
/// Pushes an iterator to the output, sending the maximum amount of elements
/// to the output.
///
/// It will not consume the iterator more than what can be sent.
///
/// ```
/// let writer = output.write();
///
/// // Send only 42s to the output
/// writer.push_iter(std::iter::repeat(42));
/// ```
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
{
false
// let writer = self.write();
// let len = writer.len();
//
// for _ in 0..len
// {
// if let Some(elt) = iter.next()
// {
// let _ = writer.push(elt);
// }
// else
// {
// return false;
// }
// }
// true
}
/// Meta information
/// Returns a string of the type of the output
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}
// impl InReader
// {
// /// Gets the amount of elements that are available
// /// on the input.
// pub fn len(&self) -> usize
// {
// 0
// //self.data_reader.len()
// }
//
// /// Returns true iif no elements are available on the input.
// pub fn is_empty(&self) -> bool
// {
// //self.len() == 0
// true
// }
//
// /// Pops an element from the input.
// /// It is guaranteed to return `Some(data)` if
// /// if pop was called strictly less times than len
// pub fn pop(&self) -> Option<Tagged<T>>
// {
// None
// // let data = self.data_reader.pop_with_index();
// // if let Some((data, index)) = data
// // {
// // let mut tag = None;
// // if self
// // .tag_reader
// // .peek(|t| t.position)
// // .is_some_and(|x| x == index)
// // {
// // tag = self.tag_reader.pop();
// // }
// // Some((data, tag.map(|t| t.tag)).into())
// // }
// // else
// // {
// // None
// // }
// }
//
// /// Pops an element from the input, discarding the tag.
// /// It is guaranteed to return `Some(data)` if
// /// if pop was called strictly less times than len
// pub fn pop_untag(&self) -> Option<T>
// {
// None
// // self.pop().map(|data| data.into_inner())
// }
//
// }
//
// impl<T> OutWriter<'_, T>
// {
// /// Gets how much room is available on the output
// pub fn len(&self) -> usize
// {
// 0
// //self.data_writer.len().min(self.tag_writer.len())
// }
//
// /// Returns true iif no element can be sent
// pub fn is_empty(&self) -> bool
// {
// true
// //self.len() == 0
// }
//
// /// Pushes some tagged data on the input.
// ///
// /// The operation succeeds (`Ok(())`) if there is enough room
// /// Or fails returning the given data to the caller.
// pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
// {
// Ok(())
// // let (data, tag) = data.into();
// // let position = self.data_writer.next_index();
// // let tag = tag.map(|t| TagSlot { position, tag: t });
// //
// // match self.data_writer.push(data)
// // {
// // Ok(_) if tag.is_some() =>
// // {
// // let _ = self.tag_writer.push(tag.unwrap());
// // Ok(())
// // }
// // Ok(_) => Ok(()),
// // Err(data) => Err((data, tag.map(|t| t.tag)).into()),
// // }
// }
//
// /// Pushes some data on the input (not tagged).
// ///
// /// The operation succeeds (`Ok(())`) if there is enough room
// /// Or fails returning the given data to the caller.
// pub fn push_no_tag(&self, data: T) -> Result<(), T>
// {
// Ok(())
// //self.data_writer.push(data)
// }
// }
// --------------------
// Iterator facilites
// --------------------
/// An iterator type to push data to output(s)
pub struct PopIter<T>
{
len: usize,
popped: usize,
reader: T,
}
// An iterator type to push data to output(s)
// pub struct PopIter<T>
// {
// len: usize,
// popped: usize,
// reader: T,
// }
/// Type on which data can be popped from
pub trait PopIterable<'a>
{
type Output;
/// Returns an iterator on the input elements :
///
/// ```
/// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !"));
/// ```
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
// Type on which data can be popped from
// pub trait PopIterable<'a>
// {
// type Output;
//
// /// Returns an iterator on the input elements :
// ///
// /// ```
// /// (&mut input_a, &mut input_b, &mut input_c).pop_iter().for_each(|(a, b, c)| println!("Got {a}, {b} and {c} !"));
// /// ```
// fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
// }
// impl<'a, T: 'static> PopIterable<'a> for In<T>
// {
@ -419,69 +427,3 @@ pub trait PopIterable<'a>
// impl_iterator_for_pop_iter_tuple! {10}
// impl_iterator_for_pop_iter_tuple! {11}
// impl_iterator_for_pop_iter_tuple! {12}
/// StreamProducer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<TagSlot>,
}
/// StreamConsumer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
inner_tag: StreamConsumer<TagSlot>,
}
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<TagSlot>)> for AnonymousStreamProducer
{
fn from(value: (StreamProducer<T>, StreamProducer<TagSlot>)) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<TagSlot>)> for AnonymousStreamConsumer
{
fn from(value: (StreamConsumer<T>, StreamConsumer<TagSlot>)) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl AnonymousStreamProducer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<TagSlot>)
{
(
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
self.inner_tag,
)
}
}
impl AnonymousStreamConsumer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<TagSlot>)
{
(
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
self.inner_tag,
)
}
}

View File

@ -1,3 +1,7 @@
use std::any::Any;
use crate::{io::{In, Out}, stream::{self, StreamConsumer, StreamProducer}, tag::TagSlot};
/// Shared object between a block's input and output objects
/// so they can "communicate" and know about each other
#[derive(Default)]
@ -21,3 +25,158 @@ pub struct BlockIOIndex
pub block_index: usize,
pub port_index: usize,
}
/// Trait to manipulate a block's input in a type agnostic/erased way
pub trait AnonymousIn
{
/// Inform the input about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
/// Returns None or the block index of the block, and the block port of the corresponding
/// Out object
fn get_producer_block(&self) -> Option<BlockIOIndex>;
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer);
}
/// Trait to manipulate a block's output in a type agnostic/erased way
pub trait AnonymousOut
{
/// Inform the output about the index of the blocks it's in, as well as its port index
fn set_index(&self, index: BlockIOIndex);
/// Sets the internal stream object
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer);
/// Returns None or the block index of the block, and the block port of the corresponding
/// In object
fn get_consumer_block(&self) -> Option<BlockIOIndex>;
/// Creates the stream with the correct corresponding type, in a type erased way.
///
/// This delegation of stream creation is necessary to allow the graph to manipulate
/// it, as it cannot know about the generic type of the stream.
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer);
}
impl<T: 'static> AnonymousIn for In<T>
{
fn set_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
}
impl<T: 'static> AnonymousOut for Out<T>
{
fn set_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
// which knows the stream type
fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<TagSlot>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
}
/// StreamProducer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<TagSlot>,
}
/// StreamConsumer object for data and tags stored in a type
/// agnostic/erased way.
///
/// This is needed for the graph system to manipulate and pass arround these objects
/// as they can't/don't know about the generic types of the stream objects
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
inner_tag: StreamConsumer<TagSlot>,
}
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<TagSlot>)> for AnonymousStreamProducer
{
fn from(value: (StreamProducer<T>, StreamProducer<TagSlot>)) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<TagSlot>)> for AnonymousStreamConsumer
{
fn from(value: (StreamConsumer<T>, StreamConsumer<TagSlot>)) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl AnonymousStreamProducer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<TagSlot>)
{
(
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
self.inner_tag,
)
}
}
impl AnonymousStreamConsumer
{
pub(crate) fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<TagSlot>)
{
(
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
self.inner_tag,
)
}
}

View File

@ -52,6 +52,36 @@ unsafe impl<T: Send> Sync for StreamProducer<T> {}
unsafe impl<T: Send> Send for StreamConsumer<T> {}
unsafe impl<T: Send> Sync for StreamConsumer<T> {}
#[repr(transparent)]
pub struct Takable<T>(MaybeUninit<T>);
impl<T> Takable<T>
{
pub fn new(element: T) -> Self
{
Takable(MaybeUninit::new(element))
}
pub unsafe fn take(&mut self) -> T
{
unsafe { std::mem::replace(&mut self.0, MaybeUninit::uninit()).assume_init() }
}
pub unsafe fn peek(&self) -> &T
{
unsafe { self.0.assume_init_ref() }
}
pub unsafe fn peek_mut(&mut self) -> &mut T
{
unsafe { self.0.assume_init_mut() }
}
}
unsafe fn takable_slice_from_maybe_uninitt<T>(slice: &mut [MaybeUninit<T>]) -> &mut [Takable<T>]
{
unsafe { std::mem::transmute(slice) }
}
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
{
@ -89,8 +119,65 @@ pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T
)
}
pub struct StreamReader<'a, T>
{
slices: (&'a mut [Takable<T>], &'a mut [Takable<T>]),
// UNSAFE !
consumer: &'a mut StreamConsumer<T>,
}
pub struct StreamWriter<'a, T>
{
slices: (&'a mut [MaybeUninit<T>], &'a mut [MaybeUninit<T>]),
// UNSAFE !
producer: &'a mut StreamProducer<T>,
}
impl<'a, T> StreamReader<'a, T>
{
pub fn slices(&self) -> (&[Takable<T>], &[Takable<T>])
{
(self.slices.0, self.slices.1)
}
pub fn slices_mut(&mut self) -> (&mut [Takable<T>], &mut [Takable<T>])
{
(self.slices.0, self.slices.1)
}
pub fn consume(self, amount: usize)
{
self.consumer.consume(amount);
}
}
impl<'a, T> StreamWriter<'a, T>
{
pub fn slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>])
{
(self.slices.0, self.slices.1)
}
pub fn slices_mut(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
{
(self.slices.0, self.slices.1)
}
pub fn produce(self, amount: usize)
{
self.producer.produce(amount);
}
}
impl<T> StreamProducer<T>
{
pub fn first_index(&self) -> usize
{
self.inner.head.load(Ordering::Relaxed)
}
pub fn produce(&mut self, written: usize)
{
// Advance head.
@ -101,12 +188,10 @@ impl<T> StreamProducer<T>
assert!(head + written - tail <= (self.inner.capacity_mask + 1));
// We want writes to the buffer to be visible when acquired in the pop side
self.inner
.head
.store(head + written, Ordering::Release);
self.inner.head.store(head + written, Ordering::Release);
}
pub fn write(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
{
// We need to claim the maximum amount of elements.
let tail = self.inner.tail.load(Ordering::Acquire);
@ -139,18 +224,18 @@ impl<T> StreamProducer<T>
let k = &mut *self.inner.buffer.get();
let (start_to_head, head_to_end) = k.split_at_mut_unchecked(wrapped_head);
let (start_to_tail, _tail_to_head) =
start_to_head.split_at_mut_unchecked(wrapped_tail);
// This functions borrows the stream mutably. As such, only one instance
// of these slices can exist for a given stream.
(head_to_end, start_to_tail)
StreamWriter {
slices: (start_to_head, head_to_end),
producer: self,
}
}
}
else
{
// We MUST have : tail < head
if wrapped_tail < wrapped_head
{
// Current configuration :
@ -158,7 +243,7 @@ impl<T> StreamProducer<T>
// | |
// tail head
// ___ ____
// slice1 slice2
// slice2 slice1
// SAFETY:
//
@ -175,7 +260,10 @@ impl<T> StreamProducer<T>
let (start_to_tail, _tail_to_head) =
start_to_head.split_at_mut_unchecked(wrapped_tail);
(head_to_end, start_to_tail)
StreamWriter {
slices: (head_to_end, start_to_tail),
producer: self,
}
}
}
else
@ -203,10 +291,14 @@ impl<T> StreamProducer<T>
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let (_start_to_head, head_to_tail) = k.split_at_mut_unchecked(wrapped_head);
let (head_to_tail, empty_slice) =
head_to_tail.split_at_mut_unchecked(wrapped_tail - wrapped_head);
(head_to_tail, empty_slice)
let (start_to_tail, _tail_to_end) = k.split_at_mut_unchecked(wrapped_tail);
let (_start_to_head, head_to_tail) = start_to_tail.split_at_mut_unchecked(wrapped_head);
let (empty_slice, head_to_tail) = head_to_tail.split_at_mut_unchecked(0);
StreamWriter {
slices: (head_to_tail, empty_slice),
producer: self,
}
}
}
}
@ -222,15 +314,18 @@ impl<T> StreamConsumer<T>
let tail = self.inner.tail.load(Ordering::Relaxed);
// Check bounds
assert!(tail + read <= head);
assert!(tail + read <= head);
// We want writes to the buffer to be visible when acquired in the pop side
self.inner
.tail
.store(tail + read, Ordering::Release);
self.inner.tail.store(tail + read, Ordering::Release);
}
pub fn read_uninit(&mut self) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
pub fn first_index(&self) -> usize
{
self.inner.tail.load(Ordering::Relaxed)
}
pub fn read_takable<'a>(&'a mut self) -> StreamReader<'a, T>
{
// We need to claim the maximum amount of elements.
let head = self.inner.head.load(Ordering::Acquire);
@ -247,14 +342,20 @@ impl<T> StreamConsumer<T>
// Buffer is empty. Return empty slice
unsafe {
let k = &mut *self.inner.buffer.get();
let head_to_tail = &mut k[wrapped_head..wrapped_tail];
let (empty_1, empty_2) = head_to_tail.split_at_mut_unchecked(0);
(empty_1, empty_2)
let empty = &mut k[0..0];
let (empty_1, empty_2) = empty.split_at_mut_unchecked(0);
StreamReader {
slices: (
takable_slice_from_maybe_uninitt(empty_1),
takable_slice_from_maybe_uninitt(empty_2),
),
consumer: self,
}
}
}
else
{
// Necessarly: wrapped_tail < wrapped_head
// Necessarly: wrapped_tail <= wrapped_head
// Two cases : The buffer overlaps the wrapping or not
if wrapped_tail < wrapped_head
{
@ -275,7 +376,15 @@ impl<T> StreamConsumer<T>
let k = &mut (&mut *self.inner.buffer.get())[wrapped_tail..wrapped_head];
let (tail_to_head, empty_slice) =
k.split_at_mut_unchecked(wrapped_head - wrapped_tail);
(tail_to_head, empty_slice)
assert_eq!(empty_slice.len(), 0);
StreamReader {
slices: (
takable_slice_from_maybe_uninitt(tail_to_head),
takable_slice_from_maybe_uninitt(empty_slice),
),
consumer: self,
}
}
}
else
@ -310,112 +419,31 @@ impl<T> StreamConsumer<T>
let (start_to_head, _head_to_tail) =
start_to_tail.split_at_mut_unchecked(wrapped_head);
(tail_to_end, start_to_head)
StreamReader {
slices: (
takable_slice_from_maybe_uninitt(tail_to_end),
takable_slice_from_maybe_uninitt(start_to_head),
),
consumer: self,
}
}
}
}
}
// Creates a reader of contiguous elements that
// satisfy the predicate
// pub fn read_while<F>(&mut self, predicate: F) -> StreamReader<'_, T>
// where
// F: Fn(&T) -> bool,
// {
// // Take a normal reader. This contains available elements to read.
// let mut reader = self.read();
//
// // We need to trim the slices to keep only the satified elements
//
// // First slice
// let mut first_kept = 0;
// // SAFETY:
// //
// // Only us can have a reference to these slices of the buffer
// for element in unsafe { &*reader.first.get() }
// {
// // SAFETY
// //
// // If this element is in a reader returned by self.read
// // with no pop called, we know it is initialized
// let init_element = unsafe { element.assume_init_ref() };
// let sat = predicate(init_element);
// if !sat
// {
// // Stop here
// // Forget about second slice
// reader.second_len = 0;
// reader.second = None;
//
// // Trim first slice
// reader.first_len = first_kept;
// unsafe {
// reader.first = std::mem::transmute::<
// &[MaybeUninit<T>],
// &UnsafeCell<[MaybeUninit<T>]>,
// >(&(&*reader.first.get())[0..first_kept]);
// }
//
// return reader;
// }
// first_kept += 1;
// }
//
// // If we are here, all of the elements of the first slice, satisfy the predicate
//
// if let Some(second_slice) = &mut reader.second
// {
// // Second slice
// let mut second_kept = 0;
// // SAFETY:
// //
// // Only us can have a reference to these slices of the buffer
// for element in unsafe { &*second_slice.get() }
// {
// // SAFETY
// //
// // If this element is in a reader returned by self.read
// // with no pop called, we know it is initialized
// let init_element = unsafe { element.assume_init_ref() };
// let sat = predicate(init_element);
// if !sat
// {
// // Stop here
// // Trim second slice
// reader.second_len = second_kept;
// unsafe {
// reader.second = Some(std::mem::transmute::<
// &[MaybeUninit<T>],
// &UnsafeCell<[MaybeUninit<T>]>,
// >(
// &(&*second_slice.get())[0..first_kept]
// ));
// }
// return reader;
// }
// second_kept += 1;
// }
// }
//
// return reader;
// }
}
impl<T: Copy> StreamConsumer<T>
{
pub fn read(&mut self) -> (&[T], &[T])
{
let (slice_1, slice_2) = self.read_uninit();
unsafe
{
(std::mem::transmute(slice_1), std::mem::transmute(slice_2))
}
}
}
// impl<T: Copy> StreamConsumer<T>
// {
// pub fn read(&mut self) -> (&[T], &[T])
// {
// let (slice_1, slice_2) = self.read_takable();
// unsafe { (std::mem::transmute(slice_1), std::mem::transmute(slice_2)) }
// }
// }
mod test
{
#[allow(unused_imports)]
use std::mem::MaybeUninit;
#[allow(unused_imports)]
@ -428,7 +456,8 @@ mod test
let (mut tx, mut rx) = bounded_queue::<usize>(4);
{
let (a, b) = tx.write();
let mut writer = tx.write();
let (a, b) = writer.slices_mut();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
@ -441,24 +470,28 @@ mod test
}
{
let (a, b) = rx.read();
let mut reader = rx.read_takable();
let (a, b) = reader.slices_mut();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
assert_eq!(a[0], 0);
assert_eq!(a[1], 1);
assert_eq!(a[2], 2);
assert_eq!(a[3], 3);
unsafe {
assert_eq!(a[0].take(), 0);
assert_eq!(a[1].take(), 1);
assert_eq!(a[2].take(), 2);
assert_eq!(a[3].take(), 3);
}
rx.consume(4);
}
// Put stream into weird situation
{
let (a, b) = tx.write();
let mut writer = tx.write();
let (a, b) = writer.slices_mut();
assert_eq!(a.len(), 4);
assert_eq!(b.len(), 0);
a[0] = MaybeUninit::new(0);
a[1] = MaybeUninit::new(1);
a[2] = MaybeUninit::new(2);
@ -467,27 +500,33 @@ mod test
}
{
let (a, b) = rx.read();
let mut reader = rx.read_takable();
let (a, b) = reader.slices_mut();
assert_eq!(a.len(), 3);
assert_eq!(b.len(), 0);
assert_eq!(a[0], 0);
assert_eq!(a[1], 1);
assert_eq!(a[2], 2);
unsafe
{
assert_eq!(a[0].take(), 0);
assert_eq!(a[1].take(), 1);
assert_eq!(a[2].take(), 2);
}
rx.consume(1);
}
{
let (a, b) = tx.write();
let writer = tx.write();
let (a, b) = writer.slices();
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
assert_eq!(b.len(), 1);
}
{
let (a, b) = rx.read();
let reader = rx.read_takable();
let (a, b) = reader.slices();
assert_eq!(a.len(), 2);
assert_eq!(b.len(), 0);
assert_eq!(b.len(), 0);
}
}
}

View File

@ -178,7 +178,7 @@ impl Tag
}
/// Creates a new tag, which is the combination of the given tags
pub fn from_tags<const N: usize>(tag_opts: [&Tag; N]) -> Tag
pub fn from_tags<const N: usize>(tag_opts: &[&Tag; N]) -> Tag
{
let new_tag = Self::default();
{
@ -197,7 +197,7 @@ impl Tag
///
/// If all the tag options are None, None is returned
/// Otherwise it is Some of the combination of all of the tags which are Some
pub fn from_tag_opts<const N: usize>(tag_opts: [&Option<Tag>; N]) -> Option<Tag>
pub fn from_tag_opts<const N: usize>(tag_opts: &[&Option<Tag>; N]) -> Option<Tag>
{
if tag_opts.iter().all(|t| t.is_none())
{
@ -256,7 +256,7 @@ impl TagMergable<Tag> for Tag
{
fn merge(&self, other: &Self) -> Self
{
Self::from_tags([self, other])
Self::from_tags(&[self, other])
}
}
@ -264,7 +264,7 @@ impl TagMergable<Option<Tag>> for Option<Tag>
{
fn merge(&self, other: &Self) -> Self
{
Tag::from_tag_opts([self, other])
Tag::from_tag_opts(&[self, other])
}
}