Compare commits

...

11 Commits

Author SHA1 Message Date
02145b6ef0 Working tx/rx 2026-03-23 20:29:53 +01:00
6429685cd2 Working fsk transmitter 2026-03-22 19:34:21 +01:00
f1f769e0e6 Starting fsk demod 2026-03-22 13:29:06 +01:00
f468cb3c6d Working tag system 2026-03-21 17:16:15 +01:00
582d876abf Tagged 2026-03-20 23:12:46 +01:00
ac5c9eeaa0 Switching things to tagged type 2026-03-20 20:22:36 +01:00
822cdc1587 Kinda working sync block system 2026-03-19 22:44:29 +01:00
6e2283755a Progress on syncblock macro 2026-03-19 21:53:07 +01:00
2baee8d28b Starts synchronous block interface 2026-03-19 15:28:31 +01:00
f727c119b8 Io 2026-03-18 23:33:49 +01:00
4aef173c7c Starting support for tags 2026-03-18 16:38:23 +01:00
37 changed files with 3138 additions and 559 deletions

221
Cargo.lock generated
View File

@ -127,6 +127,28 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "alsa"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812947049edcd670a82cd5c73c3661d2e58468577ba8489de58e1a73c04cbd5d"
dependencies = [
"alsa-sys",
"bitflags 2.11.0",
"cfg-if",
"libc",
]
[[package]]
name = "alsa-sys"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad7569085a265dd3f607ebecce7458eaab2132a84393534c95b18dcbc3f31e04"
dependencies = [
"libc",
"pkg-config",
]
[[package]]
name = "android-activity"
version = "0.6.0"
@ -451,6 +473,15 @@ dependencies = [
"objc2 0.5.2",
]
[[package]]
name = "block2"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5"
dependencies = [
"objc2 0.6.4",
]
[[package]]
name = "blocking"
version = "1.6.2"
@ -592,6 +623,17 @@ dependencies = [
"libc",
]
[[package]]
name = "chacha20"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601"
dependencies = [
"cfg-if",
"cpufeatures",
"rand_core",
]
[[package]]
name = "clipboard-win"
version = "5.4.1"
@ -692,6 +734,59 @@ dependencies = [
"libc",
]
[[package]]
name = "coreaudio-rs"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d15c3c3cee7c087938f7ad1c3098840b3ef1f1bdc7f6e496336c3b1e7a6f3914"
dependencies = [
"bitflags 2.11.0",
"libc",
"objc2-audio-toolbox",
"objc2-core-audio",
"objc2-core-audio-types",
"objc2-core-foundation",
]
[[package]]
name = "cpal"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8942da362c0f0d895d7cac616263f2f9424edc5687364dfd1d25ef7eba506d7"
dependencies = [
"alsa",
"coreaudio-rs",
"dasp_sample",
"jni 0.21.1",
"js-sys",
"libc",
"mach2",
"ndk",
"ndk-context",
"num-derive",
"num-traits",
"objc2 0.6.4",
"objc2-audio-toolbox",
"objc2-avf-audio",
"objc2-core-audio",
"objc2-core-audio-types",
"objc2-core-foundation",
"objc2-foundation 0.3.2",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"windows 0.61.3",
]
[[package]]
name = "cpufeatures"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.5.0"
@ -719,6 +814,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27ae1dd37df86211c42e150270f82743308803d90a6f6e6651cd730d5e1732f"
[[package]]
name = "dasp_sample"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c87e182de0887fd5361989c677c4e8f5000cd9491d6d563161a8f3a5519fc7f"
[[package]]
name = "dispatch"
version = "0.2.0"
@ -1016,6 +1117,7 @@ dependencies = [
name = "example"
version = "0.1.0"
dependencies = [
"cpal",
"eframe",
"egui",
"egui_plot",
@ -1023,6 +1125,7 @@ dependencies = [
"num",
"oxydsp-dsp",
"oxydsp-flowgraph",
"rand",
]
[[package]]
@ -1210,6 +1313,7 @@ dependencies = [
"cfg-if",
"libc",
"r-efi 6.0.0",
"rand_core",
"wasip2",
"wasip3",
]
@ -1741,6 +1845,15 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "mach2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a1b95cd5421ec55b445b5ae102f5ea0e768de1f82bd3001e11f426c269c3aea"
dependencies = [
"libc",
]
[[package]]
name = "malloc_buf"
version = "0.0.6"
@ -1904,6 +2017,17 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-derive"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "num-integer"
version = "0.1.46"
@ -2008,7 +2132,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4e89ad9e3d7d297152b17d39ed92cd50ca8063a89a9fa569046d41568891eff"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"libc",
"objc2 0.5.2",
"objc2-core-data",
@ -2030,6 +2154,31 @@ dependencies = [
"objc2-foundation 0.3.2",
]
[[package]]
name = "objc2-audio-toolbox"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6948501a91121d6399b79abaa33a8aa4ea7857fe019f341b8c23ad6e81b79b08"
dependencies = [
"bitflags 2.11.0",
"libc",
"objc2 0.6.4",
"objc2-core-audio",
"objc2-core-audio-types",
"objc2-core-foundation",
"objc2-foundation 0.3.2",
]
[[package]]
name = "objc2-avf-audio"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13a380031deed8e99db00065c45937da434ca987c034e13b87e4441f9e4090be"
dependencies = [
"objc2 0.6.4",
"objc2-foundation 0.3.2",
]
[[package]]
name = "objc2-cloud-kit"
version = "0.2.2"
@ -2037,7 +2186,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74dd3b56391c7a0596a295029734d3c1c5e7e510a4cb30245f8221ccea96b009"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-core-location",
"objc2-foundation 0.2.2",
@ -2049,11 +2198,34 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5ff520e9c33812fd374d8deecef01d4a840e7b41862d849513de77e44aa4889"
dependencies = [
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
[[package]]
name = "objc2-core-audio"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1eebcea8b0dbff5f7c8504f3107c68fc061a3eb44932051c8cf8a68d969c3b2"
dependencies = [
"dispatch2",
"objc2 0.6.4",
"objc2-core-audio-types",
"objc2-core-foundation",
"objc2-foundation 0.3.2",
]
[[package]]
name = "objc2-core-audio-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a89f2ec274a0cf4a32642b2991e8b351a404d290da87bb6a9a9d8632490bd1c"
dependencies = [
"bitflags 2.11.0",
"objc2 0.6.4",
]
[[package]]
name = "objc2-core-data"
version = "0.2.2"
@ -2061,7 +2233,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "617fbf49e071c178c0b24c080767db52958f716d9eabdf0890523aeae54773ef"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
@ -2073,7 +2245,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"
dependencies = [
"bitflags 2.11.0",
"block2 0.6.2",
"dispatch2",
"libc",
"objc2 0.6.4",
]
@ -2096,7 +2270,7 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55260963a527c99f1819c4f8e3b47fe04f9650694ef348ffd2227e8196d34c80"
dependencies = [
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
"objc2-metal",
@ -2108,7 +2282,7 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "000cfee34e683244f284252ee206a27953279d370e309649dc3ee317b37e5781"
dependencies = [
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-contacts",
"objc2-foundation 0.2.2",
@ -2127,7 +2301,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee638a5da3799329310ad4cfa62fbf045d5f56e3ef5ba4149e7452dcf89d5a8"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"dispatch",
"libc",
"objc2 0.5.2",
@ -2140,6 +2314,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272"
dependencies = [
"bitflags 2.11.0",
"block2 0.6.2",
"libc",
"objc2 0.6.4",
"objc2-core-foundation",
]
@ -2161,7 +2337,7 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1a1ae721c5e35be65f01a03b6d2ac13a54cb4fa70d8a5da293d7b0020261398"
dependencies = [
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-app-kit 0.2.2",
"objc2-foundation 0.2.2",
@ -2174,7 +2350,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd0cba1276f6023976a406a14ffa85e1fdd19df6b0f737b063b95f6c8c7aadd6"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
@ -2186,7 +2362,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e42bee7bff906b14b167da2bac5efe6b6a07e6f7c0a21a7308d40c960242dc7a"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
"objc2-metal",
@ -2209,7 +2385,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8bb46798b20cd6b91cbd113524c490f1686f4c4e8f49502431415f3512e2b6f"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-cloud-kit",
"objc2-core-data",
@ -2229,7 +2405,7 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44fa5f9748dbfe1ca6c0b79ad20725a11eca7c2218bceb4b005cb1be26273bfe"
dependencies = [
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-foundation 0.2.2",
]
@ -2241,7 +2417,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76cfcbf642358e8689af64cee815d139339f3ed8ad05103ed5eaf73db8d84cb3"
dependencies = [
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"objc2 0.5.2",
"objc2-core-location",
"objc2-foundation 0.2.2",
@ -2556,6 +2732,23 @@ version = "6.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf"
[[package]]
name = "rand"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8"
dependencies = [
"chacha20",
"getrandom 0.4.2",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c8d0fd677905edcbeedbf2edb6494d676f0e98d54d5cf9bda0b061cb8fb8aba"
[[package]]
name = "range-alloc"
version = "0.1.5"
@ -4096,7 +4289,7 @@ dependencies = [
"android-activity",
"atomic-waker",
"bitflags 2.11.0",
"block2",
"block2 0.5.1",
"bytemuck",
"calloop 0.13.0",
"cfg_aliases",

View File

@ -11,3 +11,5 @@ egui_plot = "0.34.1"
eframe = { version = "0.33.3", features = ["default_fonts", "wayland"] }
num = "0.4.3"
hound = "3.5.1"
rand = "0.10.0"
cpal = "0.17.3"

Binary file not shown.

View File

@ -3,19 +3,23 @@
node [shape=record];
rankdir=TB;
IterSource_0 [label="{ IterSource |{<o0> output} }"];
Map_1 [label="{ {<i0> input}| Map |{<o0> output} }"];
Repeat_2 [label="{ {<i0> input}| Repeat |{<o0> output} }"];
Nco_3 [label="{ {<i0> frequency}| Nco |{<o0> output} }"];
OscillatorSource_4 [label="{ OscillatorSource |{<o0> output} }"];
Multiplier_5 [label="{ {<i0> input_a|<i1> input_b}| Multiplier |{<o0> output} }"];
TxSink_6 [label="{ {<i0> input}| TxSink }"];
FirFilter_1 [label="{ {<i0> input}| FirFilter |{<o0> output} }"];
Map_2 [label="{ {<i0> input}| Map |{<o0> output} }"];
Repeat_3 [label="{ {<i0> input}| Repeat |{<o0> output} }"];
Nco_4 [label="{ {<i0> frequency}| Nco |{<o0> output} }"];
OscillatorSource_5 [label="{ OscillatorSource |{<o0> output} }"];
Multiplier_6 [label="{ {<i0> input_a|<i1> input_b}| Multiplier |{<o0> output} }"];
MapResultTagged_7 [label="{ {<i0> input}| MapResultTagged |{<o0> output} }"];
NullSink_8 [label="{ {<i0> input}| NullSink }"];
IterSource_0:o0 -> Map_1:i0 [label="bool"];
Map_1:o0 -> Repeat_2:i0 [label="oxydsp_dsp::units::DigitalFrequency"];
Repeat_2:o0 -> Nco_3:i0 [label="oxydsp_dsp::units::DigitalFrequency"];
Nco_3:o0 -> Multiplier_5:i0 [label="num_complex::Complex<f32>"];
OscillatorSource_4:o0 -> Multiplier_5:i1 [label="num_complex::Complex<f32>"];
Multiplier_5:o0 -> TxSink_6:i0 [label="num_complex::Complex<f32>"];
IterSource_0:o0 -> Repeat_3:i0 [label="f32"];
FirFilter_1:o0 -> Map_2:i0 [label="f32"];
Map_2:o0 -> Nco_4:i0 [label="oxydsp_dsp::units::DigitalFrequency"];
Repeat_3:o0 -> FirFilter_1:i0 [label="f32"];
Nco_4:o0 -> Multiplier_6:i0 [label="num_complex::Complex<f32>"];
OscillatorSource_5:o0 -> Multiplier_6:i1 [label="num_complex::Complex<f32>"];
Multiplier_6:o0 -> MapResultTagged_7:i0 [label="num_complex::Complex<f32>"];
MapResultTagged_7:o0 -> NullSink_8:i0 [label="num_complex::Complex<f32>"];
}

BIN
example/output.wav Normal file

Binary file not shown.

View File

@ -1,138 +1,73 @@
use std::collections::VecDeque;
use std::fmt::Display;
use std::fs::File;
use std::io;
use std::io::Write;
use std::sync::mpmc::sync_channel;
use std::sync::mpsc;
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use eframe::NativeOptions;
use egui::Color32;
use egui_plot::Line;
use egui_plot::PlotPoints;
use num::Complex;
use num::Zero;
use oxydsp_dsp::blocks::filtering::fir::FirFilter;
use oxydsp_dsp::blocks::iq::zero_if::ZeroIf;
use oxydsp_dsp::blocks::math::basic::Adder;
use oxydsp_dsp::blocks::math::basic::Multiplier;
use oxydsp_dsp::blocks::synthesis::Nco;
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate;
use oxydsp_dsp::blocks::utilities::adapters::Map;
use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged;
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
use oxydsp_dsp::blocks::utilities::adapters::Repeat;
use oxydsp_dsp::blocks::utilities::channels::TxSink;
use oxydsp_dsp::blocks::utilities::adapters::Scan;
use oxydsp_dsp::blocks::utilities::adapters::ScanTagged;
use oxydsp_dsp::blocks::utilities::channels::RxSource;
use oxydsp_dsp::blocks::utilities::iter::IterSource;
use oxydsp_dsp::blocks::utilities::squelch::Squelch;
use oxydsp_dsp::filtering::fir::Fir;
use oxydsp_dsp::units::DigitalFrequency;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::edge::In;
use oxydsp_flowgraph::edge::PopIterable;
use oxydsp_flowgraph::flowgraph;
use oxydsp_flowgraph::graph::FlowGraph;
use rand::random;
#[derive(BlockIO)]
pub struct Printer<T: 'static>
{
#[input]
input: In<T>,
use crate::receiver::RadioReceiver;
use crate::transmitter::Transmitter;
n: usize,
}
impl<T: 'static> Printer<T>
{
pub fn new(input: In<T>) -> Self
{
Self { input, n: 0 }
}
}
impl<T: 'static> Block for Printer<T>
where
T: Display,
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
for x in self.input.pop_iter()
{
if self.n.is_multiple_of(2usize.pow(20))
{
println!("{}", x);
self.n = 0;
}
self.n += 1;
}
BlockResult::Ok
}
}
pub mod receiver;
pub mod transmitter;
fn main()
{
let sample_rate = 48_000;
let sample_per_symbol = 96;
let deviation = DigitalFrequency::from_time_frequency(500., sample_rate as f64);
let carrier = DigitalFrequency::from_time_frequency(1000., sample_rate as f64);
let data = (0..255u8).flat_map(to_bits).collect::<Vec<_>>();
let (bit_stream, bits) = IterSource::new(data.into_iter());
let (to_freq, freq) = Map::new(bits, move |x| [-deviation, deviation][x as usize]);
let (repeat, freq) = Repeat::new(freq, sample_per_symbol);
let (base_oscillator, baseband) = Nco::<f32>::new(freq);
let (local_oscillator, lo) = OscillatorSource::<f32>::new(carrier.into());
let (frontend, passband) = Multiplier::new(baseband, lo);
let (tx, rx) = mpsc::channel::<Complex<f32>>();
let sink = TxSink::new(passband, tx);
let graph = flowgraph![
bit_stream,
to_freq,
repeat,
base_oscillator,
local_oscillator,
frontend,
sink,
];
File::create("out.dot")
.unwrap()
.write_all(graph.get_dot().as_bytes())
.unwrap();
let j = graph.run();
let mut output = vec![];
while let Ok(x) = rx.recv()
if std::env::args().len() == 2
{
output.push(x);
}
let _ = j.join();
println!("Transmitter");
let tx = Transmitter::start_new();
// Write signal
let spec = hound::WavSpec {
channels: 1,
sample_rate,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = hound::WavWriter::create("mod.wav", spec).unwrap();
for x in output.iter()
loop
{
let mut user_input = String::new();
io::stdin().read_line(&mut user_input).unwrap();
println!("Transmitting ...");
tx.transmit(user_input.as_bytes().to_vec());
}
}
else
{
let amplitude = i16::MAX as f32;
writer.write_sample((x.re * amplitude) as i16).unwrap();
println!("Receiver");
let _tx = RadioReceiver::start_new();
}
writer.finalize().unwrap();
//
eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| {
egui::CentralPanel::default().show(ctx, |ui| {
egui_plot::Plot::new("hello").show(ui, |plot_ui| {
plot_ui.line(Line::new(
"samples",
output
.iter()
.enumerate()
.map(|(i, s)| [i as f64, s.re as f64])
.collect::<PlotPoints>(),
));
});
ctx.request_repaint();
});
})
.unwrap();
}
pub const SAMPLE_RATE: usize = 48_000;
pub const SAMPLE_PER_SYMBOL: usize = 48;
pub const DEVIATION: f64 = 500.;
pub const CARRIER: f64 = 1700.;
pub fn to_bits(n: u8) -> [bool; 8]
{
[

336
example/src/receiver.rs Normal file
View File

@ -0,0 +1,336 @@
use std::collections::VecDeque;
use std::net::UdpSocket;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;
use cpal::Stream;
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use eframe::NativeOptions;
use egui::Color32;
use egui_plot::Line;
use egui_plot::PlotPoints;
use num::Complex;
use num::Zero;
use oxydsp_dsp::blocks::filtering::fir::FirFilter;
use oxydsp_dsp::blocks::iq::zero_if::ZeroIf;
use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate;
use oxydsp_dsp::blocks::utilities::adapters::Map;
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
use oxydsp_dsp::blocks::utilities::adapters::Scan;
use oxydsp_dsp::blocks::utilities::adapters::ScanTagged;
use oxydsp_dsp::blocks::utilities::channels::RxSource;
use oxydsp_dsp::blocks::utilities::squelch::Squelch;
use oxydsp_dsp::filtering::fir::Fir;
use oxydsp_dsp::units::DigitalFrequency;
use oxydsp_flowgraph::flowgraph;
use oxydsp_flowgraph::graph::FlowGraph;
use oxydsp_flowgraph::tag::Tag;
use oxydsp_flowgraph::tag::Tagged;
use crate::CARRIER;
use crate::DEVIATION;
use crate::SAMPLE_PER_SYMBOL;
use crate::SAMPLE_RATE;
pub enum PacketBuilderBitState
{
WaitingForPreamble,
InPacket,
}
pub enum PacketBuilderByteState
{
Length1,
Length2,
Data,
}
pub struct PacketBuilder
{
current_byte: u8,
bit_index: u8,
bit_state: PacketBuilderBitState,
packet_state: PacketBuilderByteState,
// Packet building
length: u16,
data: Vec<u8>,
}
impl PacketBuilder
{
pub fn new() -> Self
{
Self {
current_byte: 0,
bit_index: 0,
bit_state: PacketBuilderBitState::WaitingForPreamble,
packet_state: PacketBuilderByteState::Length1,
length: 0,
data: vec![],
}
}
fn next_byte(&mut self) -> Option<Vec<u8>>
{
match self.packet_state
{
PacketBuilderByteState::Length1 =>
{
self.length = 0;
self.length |= self.current_byte as u16;
println!("starting packet, length 1 {}", self.current_byte);
self.packet_state = PacketBuilderByteState::Length2;
}
PacketBuilderByteState::Length2 =>
{
println!("starting packet, length 2 {}", self.current_byte);
self.length |= (self.current_byte as u16) << 8;
self.data = vec![];
self.packet_state = PacketBuilderByteState::Data;
println!("length : {}", self.length);
}
PacketBuilderByteState::Data =>
{
self.data.push(self.current_byte);
self.length -= 1;
if self.length == 0
{
println!("finished");
let current = std::mem::replace(self, Self::new());
return Some(current.data);
}
}
}
None
}
pub fn next_bit(&mut self, bit: bool) -> Option<Vec<u8>>
{
self.current_byte >>= 1;
self.current_byte |= (bit as u8) << 7;
match self.bit_state
{
PacketBuilderBitState::WaitingForPreamble =>
{
if self.current_byte == 0b01100111
{
println!("preamble heard !");
self.bit_state = PacketBuilderBitState::InPacket;
self.bit_index = 0;
}
return None;
}
PacketBuilderBitState::InPacket =>
{
self.bit_index += 1;
if self.bit_index == 8
{
let out = self.next_byte();
self.bit_index = 0;
return out;
}
None
}
}
}
}
pub struct RadioReceiver {
//stream: Stream,
//pub packet_receiver: Receiver<Vec<u8>>,
}
impl RadioReceiver
{
pub fn start_new() -> Self
{
let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64);
let (audio_tx, audio_rx) = mpsc::channel();
let (packet_tx, packet_rx) = mpsc::channel::<Vec<u8>>();
let (source, signal) = RxSource::new(audio_rx);
let (inspect, signal) = Map::new(signal, |x| {
//println!("{x}");
x
});
let (mut zero_if, iq) = ZeroIf::new(signal, carrier.into());
zero_if.set_fir(Fir::lowpass(
DigitalFrequency::from_time_frequency(2. * DEVIATION + 100., SAMPLE_RATE as f64),
SAMPLE_PER_SYMBOL * 4,
));
let (squelch, iq) = Squelch::new(iq, 5., 100);
let (arg_extract, arg) = Scan::new(iq, Complex::zero(), |state, sample| {
let angle: Complex<f32> = sample / *state;
*state = sample;
angle.arg() * 14.
});
let mut elg_loop = Fir(vec![1. / 20.; 30]);
*elg_loop.0.last_mut().unwrap() = 1.;
let (elg, arg) = EarlyLateGate::new(arg, elg_loop, SAMPLE_PER_SYMBOL);
// // Eye diagram
let (tx, rx) = mpsc::channel::<(Vec<f32>, f32)>();
//let (eye_sender, arg) = ScanTagged::new(arg, VecDeque::<()>::new(), move |history, x| {
let (eye_sender, arg) = ScanTagged::new(arg, VecDeque::new(), move |history, x| {
let cloned_tag = x.1.clone();
if history.len() == 2 * SAMPLE_PER_SYMBOL
{
history.pop_back();
}
let mut error: f32 = 0.;
let is_symbol_center = x.1.as_ref().is_some_and(|t| {
if let Some(err) = t.retrieve("elg_symbol")
{
error = *err.downcast().unwrap();
true
}
else
{
false
}
});
history.push_front(((is_symbol_center, error), x.0));
if history.len() > SAMPLE_PER_SYMBOL && history[SAMPLE_PER_SYMBOL].0.0
{
let _ = tx.send((
history.iter().map(|(_, x)| *x).collect::<Vec<_>>(),
history[SAMPLE_PER_SYMBOL].0.1,
));
}
Tagged::new(x.0, None)
});
let (packet_map, arg) =
ScanTagged::new(arg, PacketBuilder::new(), move |builder, sample| {
if sample
.1
.as_ref()
.is_some_and(|t| t.retrieve("elg_symbol").is_some())
&& let Some(packet) = builder.next_bit(sample.0 < 0.)
{
let _ = packet_tx.send(packet);
}
Tagged::new(sample.0, None)
});
let null_sink = NullSink::new(arg);
let graph = flowgraph![
source,
inspect,
squelch,
zero_if,
packet_map,
arg_extract,
//sig_lowpass,
elg,
eye_sender,
null_sink
];
let t = graph.run();
// Setup input
// let host = cpal::default_host();
// let device = host.default_input_device().expect("No input device");
// let mut supported_configs_range = device
// .supported_input_configs()
// .expect("error while querying configs");
// let supported_config = supported_configs_range
// .next()
// .expect("no supported config?!")
// .with_sample_rate(SAMPLE_RATE as u32);
// let stream = device
// .build_input_stream(
// &supported_config.into(),
// move |data: &[f32], _: &cpal::InputCallbackInfo| {
// for x in data.iter()
// {
// let _ = audio_tx.send(*x);
// }
// },
// move |err| {
// panic!() // react to errors here.
// },
// None, // None=blocking, Some(Duration)=timeout
// )
// .unwrap();
std::thread::spawn(move || {
let socket = UdpSocket::bind("0.0.0.0:25565").unwrap();
let mut buffer = [0u8; 4096];
while let Ok(read) = socket.recv(&mut buffer)
{
let read_buffer = &mut buffer[0..read];
for x in read_buffer.chunks(4)
{
let val = f32::from_le_bytes([x[0], x[1], x[2], x[3]]);
let _ = audio_tx.send(val);
}
}
});
let mut eyes = VecDeque::new();
eframe::run_simple_native("Plot", NativeOptions::default(), move |ctx, _frame| {
while let Ok(x) = packet_rx.try_recv()
{
println!("Got data: {} bytes.", x.len());
let str: String = x.iter().map(|x| *x as char).collect();
println!("-----\n\n{}\n\n-----", str);
}
while let Ok(eye) = rx.try_recv()
{
if eyes.len() >= 100
{
let _ = eyes.pop_back();
}
eyes.push_front(eye);
}
egui::CentralPanel::default().show(ctx, |ui| {
egui_plot::Plot::new("hello").show(ui, |plot_ui| {
for eye in eyes.iter()
{
plot_ui.line(
Line::new(
"eyes",
eye.0
.iter()
.enumerate()
.map(|(i, s)| [i as f64 / 2., *s as f64])
.collect::<PlotPoints>(),
)
.id("eyes")
.color(Color32::GREEN),
);
}
});
ctx.request_repaint();
});
})
.unwrap();
Self {
//stream,
//packet_receiver: packet_rx,
}
}
}
pub fn color_from_err(error: f32, max: f32) -> Color32
{
Color32::RED
.linear_multiply(error.abs() / max)
.blend(Color32::GREEN.linear_multiply((1. - error.abs() / max).max(0.)))
}

297
example/src/transmitter.rs Normal file
View File

@ -0,0 +1,297 @@
use std::collections::VecDeque;
use std::fs::File;
use std::io::Write;
use std::iter::FusedIterator;
use std::net::UdpSocket;
use std::ops::BitXor;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
use std::sync::mpsc::sync_channel;
use std::thread::JoinHandle;
use std::time::Duration;
use cpal::Stream;
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use eframe::NativeOptions;
use egui::Color32;
use egui::output;
use egui_plot::Line;
use egui_plot::PlotPoints;
use num::Complex;
use num::Zero;
use oxydsp_dsp::blocks::filtering::fir::FirFilter;
use oxydsp_dsp::blocks::iq::zero_if::ZeroIf;
use oxydsp_dsp::blocks::math::basic::Adder;
use oxydsp_dsp::blocks::math::basic::Multiplier;
use oxydsp_dsp::blocks::synthesis::Nco;
use oxydsp_dsp::blocks::synthesis::OscillatorSource;
use oxydsp_dsp::blocks::ted::early_late::EarlyLateGate;
use oxydsp_dsp::blocks::utilities::adapters::Map;
use oxydsp_dsp::blocks::utilities::adapters::MapResultTagged;
use oxydsp_dsp::blocks::utilities::adapters::NullSink;
use oxydsp_dsp::blocks::utilities::adapters::Repeat;
use oxydsp_dsp::blocks::utilities::adapters::Scan;
use oxydsp_dsp::blocks::utilities::adapters::ScanTagged;
use oxydsp_dsp::blocks::utilities::channels::RxSource;
use oxydsp_dsp::blocks::utilities::channels::TxSink;
use oxydsp_dsp::blocks::utilities::iter::IterSource;
use oxydsp_dsp::blocks::utilities::squelch::Squelch;
use oxydsp_dsp::filtering::fir::Fir;
use oxydsp_dsp::units::DigitalFrequency;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::flowgraph;
use oxydsp_flowgraph::graph::FlowGraph;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use rand::random;
use crate::CARRIER;
use crate::DEVIATION;
use crate::SAMPLE_PER_SYMBOL;
use crate::SAMPLE_RATE;
use crate::gaussian;
use crate::to_bits;
#[derive(BlockIO)]
pub struct FlatMap<I, O, F>
where
I: 'static,
O: IntoIterator + 'static,
O::IntoIter: FusedIterator,
F: Fn(I) -> O,
{
#[input]
input: In<I>,
#[output]
output: Out<O::Item>,
current_iter: Option<O::IntoIter>,
map: F,
}
impl<I, O, F> FlatMap<I, O, F>
where
I: 'static,
O: IntoIterator + 'static,
O::IntoIter: FusedIterator,
F: Fn(I) -> O,
{
pub fn new(input: In<I>, map: F) -> (Self, In<O::Item>)
{
let (output, port) = oxydsp_flowgraph::io::stream();
(
Self {
input,
output,
current_iter: None,
map,
},
port,
)
}
}
impl<I, O, F> Block for FlatMap<I, O, F>
where
I: 'static,
O: IntoIterator + 'static,
O::IntoIter: FusedIterator,
F: Fn(I) -> O,
{
fn work(&mut self) -> BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let max_write = writer.len();
let mut written = 0;
while written < max_write
{
if let Some(current_iter) = self.current_iter.as_mut()
{
if let Some(next_elt) = current_iter.next()
{
let _ = writer.push((next_elt, None).into());
written += 1;
continue;
}
else
{
// Iterator empty
self.current_iter = None;
}
}
if self.current_iter.is_none()
{
// Get input
if let Some(input) = reader.pop()
{
let mut new_iter = (self.map)(input.0).into_iter();
if let Some(first_elt) = new_iter.next()
{
self.current_iter = Some(new_iter);
let _ = writer.push((first_elt, input.1).into());
written += 1;
}
else
{
// Iterator empty
self.current_iter = None;
continue;
}
}
else
{
// Cannot continue
break;
}
}
}
BlockResult::Ok
}
}
pub struct Transmitter
{
flowgraph_handle: JoinHandle<()>,
packet_sender: SyncSender<Vec<u8>>,
stream: Stream,
}
impl Transmitter
{
pub fn start_new() -> Self
{
let carrier = DigitalFrequency::from_time_frequency(CARRIER, SAMPLE_RATE as f64);
let deviation = DigitalFrequency::from_time_frequency(DEVIATION, SAMPLE_RATE as f64);
let (packet_tx, packet_rx): (_, Receiver<Vec<u8>>) = sync_channel(128);
let (packet_rec, packets): (_, In<Vec<u8>>) = RxSource::new(packet_rx);
let (linearizer, bits) = FlatMap::new(packets, |packet| {
// +1 for chksum
let packet_length = packet.len() as u16;
let checksum = packet.iter().copied().reduce(BitXor::bitxor).unwrap();
// Learning sequence
let mut frame = vec![0b10101010; 8];
// Preamble
frame.push(0b01100111);
frame.push(packet_length.to_le_bytes()[0]);
frame.push(packet_length.to_le_bytes()[1]);
frame.extend(packet.iter());
frame.push(checksum);
frame.extend((0..16).map(|_| 0));
frame
.into_iter()
.flat_map(to_bits)
.map(|x| if x { 1. } else { -1. })
});
let (repeat, bits) = Repeat::new(bits, SAMPLE_PER_SYMBOL);
// gaussian fir
let fir = Fir((0..SAMPLE_PER_SYMBOL)
.map(|x| gaussian(0.1, x as f32 / SAMPLE_PER_SYMBOL as f32))
.collect())
.normalized();
//let (bit_filter, bits) = FirFilter::new(bits, fir);
let (to_freq, freq) = Map::new(bits, move |x| {
DigitalFrequency::from_time_frequency(DEVIATION * x as f64, SAMPLE_RATE as f64)
});
let (base_oscillator, baseband) = Nco::<f32>::new(freq);
let (local_oscillator, lo) = OscillatorSource::<f32>::new(carrier.into());
let (frontend, passband) = Multiplier::new(baseband, lo);
let (audio_tx, audio_rx) = mpsc::channel::<Complex<f32>>();
let reverb_length = 20;
// let (reverb, passband) = FirFilter::new(
// passband,
// Fir((0..reverb_length)
// .map(|x| (-15. * (x as f32) / (reverb_length as f32)).exp())
// .collect())
// .normalized(),
// );
let (udp_map, passband) = Scan::new(
passband,
UdpSocket::bind("127.0.0.1:25566").unwrap(),
|sckt, sample| {
sckt.send_to(
&(sample.re + ((random::<f32>() * 2.) - 1.) * 0.0).to_le_bytes(),
"127.0.0.1:25565",
)
.unwrap();
sample
},
);
let tx_sink = TxSink::new(passband, audio_tx);
let graph = flowgraph![
packet_rec,
linearizer,
//reverb,
//bit_filter,
udp_map,
to_freq,
repeat,
base_oscillator,
local_oscillator,
frontend,
tx_sink,
];
// Open output device
let host = cpal::default_host();
let device = host
.default_output_device()
.expect("no output device available");
let mut supported_configs_range = device
.supported_output_configs()
.expect("error while querying configs");
let supported_config = supported_configs_range
.next()
.expect("no supported config?!")
.with_sample_rate(SAMPLE_RATE as u32);
let stream = device
.build_output_stream(
&supported_config.into(),
move |data: &mut [f32], _: &cpal::OutputCallbackInfo| {
for x in data.iter_mut()
{
if let Ok(y) = audio_rx.try_recv()
{
*x = y.re;
}
else
{
*x = 0.;
}
}
},
move |err| panic!(),
None, // None=blocking, Some(Duration)=timeout
)
.unwrap();
Self {
flowgraph_handle: graph.run(),
packet_sender: packet_tx,
stream,
}
}
pub fn transmit(&self, data: Vec<u8>)
{
let _ = self.packet_sender.send(data);
}
}

View File

@ -1,3 +1,6 @@
pub mod filtering;
pub mod iq;
pub mod math;
pub mod synthesis;
pub mod ted;
pub mod utilities;

View File

@ -0,0 +1 @@
pub mod fir;

View File

@ -0,0 +1,58 @@
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::sync_block;
use std::iter::Sum;
use std::ops::Mul;
use crate::filtering::fir::Fir;
#[derive(BlockIO)]
#[sync_block]
pub struct FirFilter<F, T, O>
where
T: Clone + 'static,
F: Mul<T, Output = O> + Clone + 'static,
O: Sum + 'static,
{
#[input]
input: In<T>,
#[output]
output: Out<O>,
filter: crate::filtering::fir::FirFilter<F, T, O>,
}
impl<F, T, O> FirFilter<F, T, O>
where
T: Clone + 'static,
F: Mul<T, Output = O> + Clone + 'static,
O: Sum + 'static,
{
pub fn new(input: In<T>, impulse_response: Fir<F>) -> (Self, In<O>)
{
let (output, filtered) = oxydsp_flowgraph::io::stream();
(
Self {
input,
output,
filter: crate::filtering::fir::FirFilter::new(impulse_response),
},
filtered,
)
}
}
impl<'view, F, T, O> SyncBlock<'view> for FirFilter<F, T, O>
where
T: Clone + 'view,
F: Mul<T, Output = O> + Clone + 'static,
O: Sum + 'static,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
{
Some(state.filter.next(input))
}
}

View File

@ -0,0 +1 @@
pub mod zero_if;

View File

@ -0,0 +1,65 @@
use num::{Complex, Float};
use oxydsp_flowgraph::{
BlockIO,
block::SyncBlock,
io::{In, Out},
sync_block,
};
use rustfft::FftNum;
use crate::{
filtering::fir::{Fir, FirFilter},
synthesis::oscillator::Nco,
};
#[derive(BlockIO)]
#[sync_block]
pub struct ZeroIf<T: std::clone::Clone + num::Num + Float + From<f32> + 'static>
{
#[input]
input: In<T>,
#[output]
output: Out<Complex<T>>,
local_oscillator: Nco<T>,
filter: FirFilter<Complex<T>, Complex<T>, Complex<T>>,
}
impl<T> ZeroIf<T>
where
T: std::clone::Clone + num::Num + FftNum + From<f32> + 'static + num::Float,
{
pub fn new(input: In<T>, lo: Nco<T>) -> (Self, In<Complex<T>>)
{
let (output, port) = oxydsp_flowgraph::io::stream();
(
Self {
input,
output,
local_oscillator: lo,
filter: FirFilter::new(Fir::lowpass(lo.frequency(), 100)),
},
port,
)
}
pub fn set_fir(&mut self, fir: Fir<Complex<T>>)
{
self.filter = FirFilter::new(fir);
}
}
impl<'view, T> SyncBlock<'view> for ZeroIf<T>
where
T: std::clone::Clone + num::Num + Float + From<f32> + 'static + num::Float,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
{
// Mix
let lo_sample = state.local_oscillator.next().unwrap();
let iq = Complex::new(input * lo_sample.re, input * lo_sample.im);
Some(state.filter.next(iq))
}
}

View File

@ -1,12 +1,18 @@
use std::ops::{Add, Mul};
use std::ops::Add;
use std::ops::Mul;
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
edge::{In, Out, PopIterable, stream},
};
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::TagMergable;
#[derive(BlockIO)]
#[sync_block]
pub struct Adder<Ia, Ib, O>
where
Ia: Add<Ib, Output = O> + 'static,
@ -31,7 +37,7 @@ where
{
pub fn new(input_a: In<Ia>, input_b: In<Ib>) -> (Self, In<O>)
{
let (output, added) = stream();
let (output, added) = oxydsp_flowgraph::io::stream();
(
Self {
input_a,
@ -43,23 +49,35 @@ where
}
}
impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
impl<'view, Ia, Ib, O> SyncBlock<'view> for Adder<Ia, Ib, O>
where
Ia: Add<Ib, Output = O> + 'static,
Ib: 'static,
O: 'static,
{
fn work(&mut self) -> BlockResult
fn sync_work(_state: Self::StateView, input: Self::Input) -> Option<Self::Output>
{
self.output.push_iter(
(&mut self.input_a, &mut self.input_b)
.pop_iter()
.map(|(a, b)| a + b),
);
BlockResult::Ok
Some(input.0 + input.1)
}
}
// impl<Ia, Ib, O> Block for Adder<Ia, Ib, O>
// where
// Ia: Add<Ib, Output = O> + 'static,
// Ib: 'static,
// O: 'static,
// {
// fn work(&mut self) -> BlockResult
// {
// self.output.push_iter(
// (&mut self.input_a, &mut self.input_b)
// .pop_iter()
// .map(|(a, b)| (a.0 + b.0, a.1.merge(&b.1))),
// );
// BlockResult::Ok
// }
// }
#[derive(BlockIO)]
pub struct Multiplier<Ia, Ib, O>
where
@ -85,7 +103,7 @@ where
{
pub fn new(input_a: In<Ia>, input_b: In<Ib>) -> (Self, In<O>)
{
let (output, added) = stream();
let (output, added) = oxydsp_flowgraph::io::stream();
(
Self {
input_a,
@ -108,7 +126,7 @@ where
self.output.push_iter(
(&mut self.input_a, &mut self.input_b)
.pop_iter()
.map(|(a, b)| a * b),
.map(|(a, b)| (a.0 * b.0, a.1.merge(&b.1)).into()),
);
BlockResult::Ok
}

View File

@ -4,10 +4,10 @@ use num::Float;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::edge::In;
use oxydsp_flowgraph::edge::Out;
use oxydsp_flowgraph::edge::PopIterable;
use oxydsp_flowgraph::edge::stream;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
#[derive(BlockIO)]
pub struct OscillatorSource<T: Float + From<f32> + 'static>
@ -31,7 +31,8 @@ impl<T: Float + From<f32> + 'static> Block for OscillatorSource<T>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(&mut self.nco);
self.output
.push_iter((&mut self.nco).map(|x| (x, None).into()));
BlockResult::Ok
}
}
@ -81,8 +82,8 @@ impl<T: Float + From<f32> + 'static> Block for Nco<T>
{
self.output
.push_iter(&mut self.frequency.pop_iter().map(|f| {
self.nco.set_frequency(f);
self.nco.next().unwrap()
self.nco.set_frequency(f.0);
(self.nco.next().unwrap(), f.1).into()
}));
BlockResult::Ok
}

View File

@ -0,0 +1 @@
pub mod early_late;

View File

@ -0,0 +1,112 @@
use std::collections::VecDeque;
use std::iter::Sum;
use num::Float;
use num::NumCast;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
use crate::filtering::fir::Fir;
use crate::filtering::fir::FirFilter;
#[derive(BlockIO)]
#[sync_block(tagged)]
pub struct EarlyLateGate<T: Float + Send + Sync + Sum + Clone + NumCast + 'static>
{
#[input]
input: In<T>,
#[output]
output: Out<T>,
symbol_length: usize,
// Window looking at symbol_length samples at a time
window: VecDeque<T>,
// The current location of the window, in relation to the last sample
window_location: usize,
window_center: usize,
// The next window location, in relation to the last sample such that the window is centered on
// a symbol center (hopefully)
next_sample: f32,
loop_filter: FirFilter<T, T, T>,
}
impl<T> EarlyLateGate<T>
where
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
{
pub fn new(input: In<T>, loop_filter: Fir<T>, symbol_length: usize) -> (Self, In<T>)
{
let (output, samples) = oxydsp_flowgraph::io::stream();
(
Self {
input,
output,
window: VecDeque::with_capacity(symbol_length),
symbol_length,
window_location: 0,
window_center: symbol_length / 2,
next_sample: symbol_length as f32, // We assume that the first symbol is 1.5 windows into
// the stream
loop_filter: FirFilter::new(loop_filter),
},
samples,
)
}
}
impl<'view, T> SyncBlock<'view> for EarlyLateGate<T>
where
T: Float + Sum + Clone + 'static + Send + Sync + NumCast,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
{
if state.window.len() < *state.symbol_length
{
state.window.push_back(input.0);
*state.window_location += 1;
return Some(input.0.into());
}
// Bring new sample in
state.window.pop_front();
state.window.push_back(input.0);
*state.window_location += 1;
let sample = state.window[*state.window_center];
let mut tag = None;
if *state.window_location >= *state.next_sample as usize
{
let early_index = *state.window_center - (0.25 * *state.symbol_length as f32) as usize;
let late_index = *state.window_center + (0.25 * *state.symbol_length as f32) as usize;
let early_sample = state.window[early_index];
let late_sample = state.window[late_index];
let error = (late_sample - early_sample) * sample;
let correction = state.loop_filter.next(error);
// Figure out next sample location
*state.next_sample +=
(*state.symbol_length as f32 + correction.to_f32().unwrap()).max(0.);
// Turn everything back relative to current sample
*state.next_sample -= *state.window_location as f32;
*state.window_location = 0;
let new_tag = Tag::default();
new_tag.tag("elg_symbol", error);
tag = Some(new_tag);
}
Some((sample, tag).into())
}
}

View File

@ -1,3 +1,4 @@
pub mod adapters;
pub mod channels;
pub mod iter;
pub mod squelch;

View File

@ -1,8 +1,17 @@
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
edge::{In, Out, PopIterable, stream},
};
use core::sync;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
use oxydsp_flowgraph::tag::TagMergable;
use oxydsp_flowgraph::tag::Tagged;
#[derive(BlockIO)]
pub struct Map<I: 'static, O: 'static, F>
@ -33,11 +42,217 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
self.output.push_iter(self.input.pop_iter().map(&self.map));
self.output.push_iter(
self.input
.pop_iter()
.map(|x| ((&self.map)(x.0), x.1).into()),
);
BlockResult::Ok
}
}
#[derive(BlockIO)]
pub struct MapResult<I: 'static, O: 'static, F>
{
#[input]
input: In<I>,
#[output]
output: Out<O>,
map: F,
}
impl<I: 'static, O: 'static, F> MapResult<I, O, F>
where
F: Fn(I) -> (O, BlockResult),
{
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
{
let (output, mapped) = stream();
(Self { input, output, map }, mapped)
}
}
impl<I: 'static, O: 'static, F> Block for MapResult<I, O, F>
where
F: Fn(I) -> (O, BlockResult),
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
for _ in 0..(writer.len().min(reader.len()))
{
let (input, tag_opt) = reader.pop().unwrap().into();
let (output, result) = (self.map)(input);
let _ = writer.push((output, tag_opt).into());
match result
{
BlockResult::Terminated | BlockResult::Exit =>
{
return result;
}
BlockResult::Ok =>
{}
}
}
BlockResult::Ok
}
}
#[derive(BlockIO)]
pub struct MapResultTagged<I: 'static, O: 'static, F>
{
#[input]
input: In<I>,
#[output]
output: Out<O>,
map: F,
}
impl<I: 'static, O: 'static, F> MapResultTagged<I, O, F>
where
F: Fn(Tagged<I>) -> (Tagged<O>, BlockResult),
{
pub fn new(input: In<I>, map: F) -> (Self, In<O>)
{
let (output, mapped) = stream();
(Self { input, output, map }, mapped)
}
}
impl<I: 'static, O: 'static, F> Block for MapResultTagged<I, O, F>
where
F: Fn(Tagged<I>) -> (Tagged<O>, BlockResult),
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
for _ in 0..(writer.len().min(reader.len()))
{
let (input, tag_opt) = reader.pop().unwrap().into();
let (tagged_out, result) = (self.map)((input, tag_opt.clone()).into());
let (output, tag_out) = tagged_out.into();
let _ = writer.push((output, tag_opt.merge(&tag_out)).into());
match result
{
BlockResult::Terminated | BlockResult::Exit =>
{
return result;
}
BlockResult::Ok =>
{}
}
}
BlockResult::Ok
}
}
#[derive(BlockIO)]
#[sync_block]
pub struct Scan<I: 'static, O: 'static, S, F>
where
F: Fn(&mut S, I) -> O,
{
#[input]
input: In<I>,
#[output]
output: Out<O>,
state: S,
map: F,
}
impl<I: 'static, O: 'static, S, F> Scan<I, O, S, F>
where
F: Fn(&mut S, I) -> O,
{
pub fn new(input: In<I>, initial_state: S, map: F) -> (Self, In<O>)
{
let (output, mapped) = stream();
(
Self {
input,
output,
state: initial_state,
map,
},
mapped,
)
}
}
impl<'view, I, O, S, F> SyncBlock<'view> for Scan<I, O, S, F>
where
I: 'static,
O: 'static,
S: 'view,
F: Fn(&mut S, I) -> O + 'view,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
{
Some((*state.map)(state.state, input))
}
}
#[derive(BlockIO)]
#[sync_block(tagged)]
pub struct ScanTagged<I: 'static, O: 'static, S, F>
where
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
{
#[input]
input: In<I>,
#[output]
output: Out<O>,
state: S,
map: F,
}
impl<I: 'static, O: 'static, S, F> ScanTagged<I, O, S, F>
where
F: Fn(&mut S, Tagged<I>) -> Tagged<O>,
{
pub fn new(input: In<I>, initial_state: S, map: F) -> (Self, In<O>)
{
let (output, mapped) = stream();
(
Self {
input,
output,
state: initial_state,
map,
},
mapped,
)
}
}
impl<'view, I, O, S, F> SyncBlock<'view> for ScanTagged<I, O, S, F>
where
I: 'static,
O: 'static,
S: 'view,
F: Fn(&mut S, Tagged<I>) -> Tagged<O> + 'view,
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>
{
Some((*state.map)(state.state, input))
}
}
#[derive(BlockIO)]
pub struct Repeat<T: 'static>
{
@ -46,7 +261,7 @@ pub struct Repeat<T: 'static>
repetitions: usize,
remaining: usize,
current: Option<T>,
current: Option<(T, Option<Tag>)>,
#[output]
output: Out<T>,
@ -82,16 +297,54 @@ impl<T: Clone + 'static> Block for Repeat<T>
{
if self.remaining == 0 || self.current.is_none()
{
self.current = Some(reader.pop().unwrap());
self.remaining = self.repetitions;
if let Some(x) = reader.pop()
{
self.current = Some(x.into());
self.remaining = self.repetitions;
}
else
{
return BlockResult::Ok;
}
}
writer
.push(self.current.clone().unwrap())
.push(self.current.clone().unwrap().into())
.unwrap_or_else(|_| panic!());
if let Some((_, tag)) = &mut self.current
{
*tag = None;
}
self.remaining -= 1;
}
BlockResult::Ok
}
}
#[derive(BlockIO)]
#[sync_block]
pub struct NullSink<T: 'static>
{
#[input]
input: In<T>,
}
impl<T: 'static> NullSink<T>
{
pub fn new(input: In<T>) -> Self
{
Self { input }
}
}
impl<'view, I: 'static> SyncBlock<'view> for NullSink<I>
{
fn sync_work(_: Self::StateView, _: Self::Input) -> Option<Self::Output>
{
// Don't do shit !
Some(())
}
}

View File

@ -1,10 +1,14 @@
use std::sync::mpsc::{Receiver, Sender, SyncSender};
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
edge::{In, Out, PopIterable, stream},
};
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::PopIterable;
use oxydsp_flowgraph::io::stream;
#[derive(BlockIO)]
pub struct RxSource<Rx, I: 'static>
@ -45,7 +49,9 @@ impl<I: 'static> Block for RxSource<Receiver<I>, I>
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
if self.output.push_iter(self.input.iter())
if self
.output
.push_iter(self.input.try_iter().map(|x| (x, None).into()))
{
BlockResult::Ok
}
@ -63,7 +69,7 @@ impl<I: 'static> Block for TxSink<Sender<I>, I>
if self
.input
.pop_iter()
.map(|x| self.output.send(x))
.map(|x| self.output.send(x.0))
.any(|res| res.is_err())
{
BlockResult::Terminated
@ -82,7 +88,7 @@ impl<I: 'static> Block for TxSink<SyncSender<I>, I>
if self
.input
.pop_iter()
.map(|x| self.output.send(x))
.map(|x| self.output.send(x.0))
.any(|res| res.is_err())
{
BlockResult::Terminated

View File

@ -1,15 +1,21 @@
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
edge::{In, Out, stream},
};
use std::iter::Peekable;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
use oxydsp_flowgraph::block::SyncBlock;
use oxydsp_flowgraph::io::In;
use oxydsp_flowgraph::io::Out;
use oxydsp_flowgraph::io::stream;
use oxydsp_flowgraph::sync_block;
use oxydsp_flowgraph::tag::Tag;
#[derive(BlockIO)]
pub struct IterSource<I: Iterator>
where
I::Item: 'static,
{
iter: I,
iter: Peekable<I>,
#[output]
output: Out<I::Item>,
@ -23,7 +29,13 @@ where
pub fn new(iter: I) -> (Self, In<I::Item>)
{
let (output, items) = stream();
(Self { iter, output }, items)
(
Self {
iter: iter.peekable(),
output,
},
items,
)
}
}
@ -34,13 +46,22 @@ where
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
if self.output.push_iter(&mut self.iter)
let writer = self.output.write();
for _ in 0..writer.len()
{
BlockResult::Ok
}
else
{
BlockResult::Terminated
if let Some(element) = self.iter.next()
{
let mut tag = None;
if self.iter.peek().is_none()
{
let new_tag = Tag::default();
new_tag.tag("itersource_finished", ());
tag = Some(new_tag);
}
let _ = writer.push((element, tag).into());
}
}
BlockResult::Ok
}
}

View File

@ -0,0 +1,79 @@
use std::{collections::VecDeque, iter::Sum};
use num::{Float, FromPrimitive, One, Zero, complex::ComplexFloat};
use oxydsp_flowgraph::{
BlockIO,
block::{Block, BlockResult},
io::{In, Out, PopIterable},
};
use crate::filtering::fir::{Fir, FirFilter};
#[derive(BlockIO)]
pub struct Squelch<T>
where
T: ComplexFloat + 'static,
T::Real: Float + One + Zero + FromPrimitive + Sum + Clone,
{
#[input]
input: In<T>,
#[output]
output: Out<T>,
trigger_level: T::Real,
history: VecDeque<T::Real>,
sum: T::Real,
divider: T::Real,
}
impl<T> Squelch<T>
where
T: ComplexFloat + 'static,
T::Real: Float + Sum + Clone + FromPrimitive,
{
pub fn new(input: In<T>, trigger_level: T::Real, mean_length: usize) -> (Self, In<T>)
{
let (output, port) = oxydsp_flowgraph::io::stream();
(
Self {
input,
output,
trigger_level,
history: VecDeque::from(vec![T::Real::zero(); mean_length]),
sum: T::Real::zero(),
divider: T::Real::from_usize(mean_length).unwrap(),
},
port,
)
}
}
impl<T> Block for Squelch<T>
where
T: ComplexFloat + 'static,
T::Real: Float + Sum + Clone + FromPrimitive,
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let writer = self.output.write();
for x in self.input.pop_iter().take(writer.len())
{
let (element, tag) = x.into();
let oldest = self.history.pop_front().unwrap();
let newest = element.abs();
self.history.push_back(newest);
self.sum = self.sum - oldest;
self.sum = self.sum + newest;
if (self.sum / self.divider) > self.trigger_level
{
let _ = writer.push((element, tag).into());
}
}
BlockResult::Ok
}
}

View File

@ -0,0 +1 @@
pub mod fir;

View File

@ -0,0 +1,120 @@
use std::collections::VecDeque;
use std::f64::consts::PI;
use std::iter::Sum;
use std::ops::Div;
use std::ops::Mul;
use std::process::Output;
use num::Complex;
use num::Float;
use num::One;
use num::Zero;
use num::complex::ComplexFloat;
use num::zero;
use rustfft::FftNum;
use rustfft::FftPlanner;
use crate::map;
use crate::units::DigitalFrequency;
/// Finite impulse response
pub struct Fir<T>(pub Vec<T>);
impl<T> Fir<Complex<T>>
where
T: FftNum + Float + Clone,
{
pub fn from_transfer_function(tf: impl AsRef<[Complex<T>]>) -> Fir<Complex<T>>
{
let mut planner = FftPlanner::new();
let tf_len = tf.as_ref().len();
let ifft = planner.plan_fft_inverse(tf.as_ref().len());
let mut fir = tf.as_ref().to_vec();
ifft.process(fir.as_mut_slice());
let mut shifted_fir = vec![];
for i in 0..tf_len
{
let k = (tf_len - (tf_len / 2) + i) % tf_len;
shifted_fir.push(fir[k]);
}
Fir(shifted_fir)
}
pub fn lowpass(cutoff: DigitalFrequency, length: usize) -> Fir<Complex<T>>
{
let mut tf = vec![Complex::<T>::zero(); length];
let cutoff_bin = map(cutoff.as_rad(), 0., 2. * PI, 0., length as f64).floor() as usize;
for i in 0..cutoff_bin
{
tf[i] = Complex::<T>::one();
tf[length - i - 1] = Complex::<T>::one();
}
Self::from_transfer_function(tf)
}
}
impl<T> Fir<T>
where
T: ComplexFloat + Div<T::Real, Output = T> + Copy + Sum,
T::Real: Float,
{
pub fn normalized(mut self) -> Self
{
let sum: T = self.0.iter().copied().sum();
let len = Float::sqrt(sum.im() * sum.im() + sum.re() * sum.re());
self.0.iter_mut().for_each(|x| *x = *x / len);
self
}
}
pub struct FirFilter<F, T, O>
where
F: Mul<T, Output = O>,
O: Sum,
{
fir: Vec<F>,
taps: VecDeque<T>,
}
impl<F, T, O> FirFilter<F, T, O>
where
T: Clone,
F: Mul<T, Output = O> + Clone,
O: Sum,
{
pub fn new(impulse_response: Fir<F>) -> Self
{
let len = impulse_response.0.len();
Self {
fir: impulse_response.0,
taps: VecDeque::with_capacity(len),
}
}
pub fn next(&mut self, input: T) -> O
{
if self.taps.len() == self.fir.len()
{
self.taps.pop_front();
}
self.taps.push_back(input);
self.fir
.iter()
.zip(self.taps.iter())
.map(|(a, b)| a.clone() * b.clone())
.sum()
}
}
// Completely stolen from sdrpp code
pub fn estimate_fir_length(transition_width: f64, sample_rate: f64) -> f64
{
3.8 * sample_rate / transition_width
}

View File

@ -1,6 +1,7 @@
use num::Float;
pub mod blocks;
pub mod filtering;
pub mod synthesis;
pub mod units;

View File

@ -30,6 +30,11 @@ impl<T> Nco<T>
}
}
pub fn frequency(&self) -> DigitalFrequency
{
DigitalFrequency(self.d_phase)
}
pub fn with_phase(frequency: DigitalFrequency, phase: Phase) -> Self
{
Self {

View File

@ -1,4 +1,5 @@
use std::{f64::consts::PI, ops::Neg};
use std::f64::consts::PI;
use std::ops::Neg;
use crate::map;

View File

@ -1,13 +1,43 @@
use proc_macro::TokenStream;
use zyn::FromInput;
use zyn::ToTokens;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::ext::ItemExt;
use zyn::format_ident;
use zyn::ident;
use zyn::parse_input;
use zyn::syn::Attribute;
use zyn::syn::GenericParam;
use zyn::syn::Index;
use zyn::syn::Lit;
use zyn::syn::TypeGenerics;
use zyn::syn::parse_quote;
use zyn::syn::punctuated::Punctuated;
use zyn::syn::spanned::Spanned;
mod sync;
#[derive(zyn::Attribute, Clone, Copy)]
#[zyn("sync_block", about = "Convenient derivation for synchronous blocks")]
struct SyncBlockConfig
{
#[zyn(default)]
tagged: bool,
}
#[zyn::attribute]
pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct, args: zyn::Args) -> zyn::TokenStream
{
let config = SyncBlockConfig {
tagged: args.iter().any(|x| x.as_flag() == "tagged"),
};
use sync::SyncBlockImpl;
zyn::zyn!(
@sync_block_impl(item = item, config = config)
)
}
#[zyn::derive("BlockIO", attributes(input, output), debug = "pretty")]
pub fn block_io(
#[zyn(input)] ident: zyn::Extract<zyn::syn::Ident>,
@ -17,6 +47,7 @@ pub fn block_io(
{
let ident = ident.inner();
let (impl_generics, type_generics, where_clause) = generics.split_for_impl();
zyn::zyn!(
impl {{impl_generics}} oxydsp_flowgraph::block::BlockIO for {{ ident.clone() }} {{ type_generics }}
{{ where_clause }}
@ -146,7 +177,7 @@ fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
fn set_anonymous_out_stream(
&mut self,
output_index: usize,
producer: oxydsp_flowgraph::edge::AnonymousStreamProducer,
producer: oxydsp_flowgraph::io::AnonymousStreamProducer,
)
{
match output_index
@ -160,7 +191,7 @@ fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
}
#[allow(unreachable_code)]
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::edge::AnonymousStreamConsumer)
fn set_anonymous_in_stream(&mut self, input_index: usize, consumer: oxydsp_flowgraph::io::AnonymousStreamConsumer)
{
match input_index
{
@ -178,29 +209,26 @@ fn block_io_set_streams(fields: zyn::syn::Fields) -> zyn::TokenStream
fn block_io_create_stream(fields: zyn::syn::Fields) -> zyn::TokenStream
{
zyn::zyn!(
#[allow(unreachable_code)]
fn create_anonymous_stream_for(
#[allow(unreachable_code)]
fn create_anonymous_stream_for(
&mut self,
output_index: usize,
capacity: usize
) -> (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
{
let (tx, rx): (oxydsp_flowgraph::edge::AnonymousStreamProducer, oxydsp_flowgraph::edge::AnonymousStreamConsumer)
= match output_index
capacity: usize,
) -> (
oxydsp_flowgraph::io::AnonymousStreamProducer,
oxydsp_flowgraph::io::AnonymousStreamConsumer,
)
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
let output = match output_index
{
{{ field.0 }} =>
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
let (tx, rx) = oxydsp_flowgraph::stream::bounded_queue::<@out_inner_type(ty = field.1.ty.clone())>(capacity);
(tx.into(), rx.into())
},
}
_ => panic!("output_index out of bounds.")
};
(tx, rx)
}
{{ field.0 }} => self.{{ field.1.ident }}.create_anonymous_stream(capacity),
}
_ => panic!("output_index out of bounds."),
};
return output;
}
)
}
@ -229,90 +257,6 @@ fn out_inner_type(ty: zyn::syn::Type) -> zyn::TokenStream
out_ty.unwrap()
}
// Sync block
#[zyn::attribute]
pub fn sync_block(#[zyn(input)] item: zyn::syn::ItemStruct) -> zyn::TokenStream
{
let mut strcut_item = item.clone();
let (impl_generics, type_generics, where_clause) = item.generics.split_for_impl();
let fields = &item.fields.as_named().unwrap().named;
// Get state fields
let mut state_fields = vec![];
for field in strcut_item.fields.iter_mut()
{
let attr_index = field.attrs.iter().enumerate().find_map(|(i, attr)| {
if attr.is("sync_state") { Some(i) } else { None }
});
if let Some(state_field) = &field.ident
&& let Some(attr_index) = attr_index
{
state_fields.push(state_field.clone());
field.attrs.remove(attr_index);
}
}
zyn::zyn!(
{{ strcut_item }}
impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ strcut_item.ident }} {{ type_generics }}
where {{ where_clause }}
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
let mut len = usize::MAX;
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
let mut {{ field.1.ident.clone().unwrap() | ident:"{}_reader" }} = self.{{field.1.ident}}.read();
len = len.min({{ field.1.ident.clone().unwrap() | ident:"{}_reader" }}.len());
}
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
let mut {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }} = self.{{field.1.ident}}.write();
len = len.min({{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.len());
}
for _ in 0..len
{
if let Some((
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
{{ field.1.ident.clone().unwrap() | ident: "{}_out" }},
}
)) = Self::sync_work(
(
@for (state_field in state_fields)
{
&mut self.{{ state_field }},
}
),
(
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("input"))).enumerate())
{
{{ field.1.ident.clone().unwrap() | ident: "{}_reader" }}.pop().unwrap(),
}
)
)
{
@for (field in fields.iter().filter(|x| x.attrs.iter().any(|x| x.is("output"))).enumerate())
{
let _ = {{ field.1.ident.clone().unwrap() | ident: "{}_writer" }}.push({{ field.1.ident.clone().unwrap() | ident: "{}_out" }});
}
} else
{
return oxydsp_flowgraph::block::BlockResult::Terminated;
}
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}
)
}
// Generate
#[proc_macro]
pub fn generate_pop_iterable_tuple_impl(input: TokenStream) -> TokenStream
@ -373,7 +317,7 @@ pub fn generate_pop_iterable_tuple_impl(input: TokenStream) -> TokenStream
type Output = (
@for (i in 0..count)
{
StreamReader<'a, {{ generics[i] }}>,
InReader<'a, {{ generics[i] }}>,
}
);
fn pop_iter(&'a mut self) -> PopIter<Self::Output>
@ -457,14 +401,14 @@ pub fn impl_iterator_for_pop_iter_tuple(input: TokenStream) -> TokenStream
> Iterator for PopIter<(
@for (i in 0..count)
{
StreamReader<'a, {{ generics[i] }}>,
InReader<'a, {{ generics[i] }}>,
}
)>
{
type Item = (
@for (i in 0..count)
{
{{ generics[i] }},
Tagged<{{ generics[i] }}>,
}
);
fn next(&mut self) -> Option<Self::Item>
@ -490,3 +434,95 @@ pub fn impl_iterator_for_pop_iter_tuple(input: TokenStream) -> TokenStream
.to_token_stream()
.into()
}
// #[proc_macro]
// pub fn generate_push_iterable_tuple_impl(input: TokenStream) -> TokenStream
// {
// let count = parse_input!(input as Lit);
// let count: usize = match count
// {
// Lit::Int(lit_int) => lit_int.base10_parse::<usize>().unwrap(),
// _ =>
// {
// return zyn::syn::Error::new(count.span(), "Must be an integer")
// .to_compile_error()
// .into();
// }
// };
// let generics = [
// format_ident!("A"),
// format_ident!("B"),
// format_ident!("C"),
// format_ident!("D"),
// format_ident!("E"),
// format_ident!("F"),
// format_ident!("G"),
// format_ident!("H"),
// format_ident!("I"),
// format_ident!("J"),
// format_ident!("K"),
// format_ident!("L"),
// format_ident!("M"),
// format_ident!("N"),
// format_ident!("O"),
// format_ident!("P"),
// format_ident!("Q"),
// format_ident!("R"),
// format_ident!("S"),
// format_ident!("T"),
// format_ident!("U"),
// format_ident!("V"),
// format_ident!("W"),
// format_ident!("X"),
// format_ident!("Y"),
// format_ident!("Z"),
// ];
//
// let iterator_item = zyn::zyn!(
// (
// @for (i in 0..count)
// {
// ({{ generics[i] }}, Option<Tag>),
// }
// )
// )
// .to_token_stream();
//
// zyn::zyn!(
// impl<'a,
// @for (i in 0..count)
// {
// {{ generics[i] }}: 'static,
// }
// > PushIterable<'a, {{ iterator_item }}> for (
// @for (i in 0..count)
// {
// &mut Out<{{ generics[i] }}>,
// }
// )
// {
// fn push_iter<I: Iterator<Item = {{ iterator_item }}>>(
// &'a mut self,
// mut iter: I,
// ) -> bool
// {
// @for (i in 0..count)
// {
// let {{ i | ident:"writer_{}" }} = self.{{ Index::from(i) }}.write();
// }
// let len = [
// @for (i in 0..count)
// {
// {{ i | ident:"reader_{}" }}.len(),
// }
// ].into_iter().min().unwrap();
//
// for _ in 0..len
// {
// if let Some()
// }
// }
// }
// )
// .to_token_stream()
// .into()
// }

View File

@ -0,0 +1,523 @@
use zyn::Fields;
use zyn::FromInput;
use zyn::ToTokens;
use zyn::ast::at;
use zyn::ext::AttrExt;
use zyn::ext::FieldsExt;
use zyn::ext::TypeExt;
use zyn::quote::quote;
use zyn::syn::Field;
use zyn::syn::GenericParam;
use zyn::syn::Lifetime;
use zyn::syn::parse_quote;
use crate::SyncBlockConfig;
// Sync block
#[zyn::element]
pub fn sync_block_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) -> zyn::TokenStream
{
zyn::zyn!(
{{ item }}
// module to keep everything clean
mod {{ item.ident | snake | ident: "{}_synchronous_block" }}
{
use super::*;
@sync_block_view_struct(item = item.clone())
}
@sync_block_syncio_impl(item = item.clone(), config = *config)
@sync_block_impl_block(item = item.clone(), tagged = config.tagged)
)
}
// Block implementation for sync block
#[zyn::element]
fn sync_block_syncio_impl(item: zyn::syn::ItemStruct, config: SyncBlockConfig) -> zyn::TokenStream
{
let view_lifetime: GenericParam = parse_quote!('view);
let mut view_generics = item.generics.clone();
view_generics.params.iter_mut().for_each(|x| match x
{
GenericParam::Lifetime(_) =>
{}
GenericParam::Type(type_param) => type_param
.bounds
.push(zyn::syn::TypeParamBound::Lifetime(parse_quote!('view))),
GenericParam::Const(_) =>
{}
});
view_generics.params.insert(0, view_lifetime);
let (view_impl_generics, view_type_generics, view_where_clause) =
view_generics.split_for_impl();
let (_impl_generics, type_generics, _where_clause) = item.generics.split_for_impl();
zyn::zyn!(
impl {{ view_impl_generics }} oxydsp_flowgraph::block::SyncBlockIO<'view> for {{ item.ident }} {{ type_generics }}
{{ view_where_clause }}
{
// Path within module
type StateView = {{ item.ident | snake | ident: "{}_synchronous_block" }}::{{ item.ident | ident:"{}View" }} {{ view_type_generics }};
type Input = {{ sync_block_io_types(item.clone(), "input", config.tagged) }};
type Output = {{ sync_block_io_types(item.clone(), "output", config.tagged) }};
}
)
}
// Input/Output types for block
fn sync_block_io_types(
item: zyn::syn::ItemStruct,
io: &'static str,
tagged: bool,
) -> zyn::TokenStream
{
let field_types = item
.fields
.as_named()
.unwrap()
.named
.iter()
.filter(|f| f.attrs.iter().any(|attr| attr.is(io)))
.map(|x| x.ty.clone())
.map(|ty| {
match ty
.as_path()
.unwrap()
.segments
.last()
.unwrap()
.arguments
.clone()
{
zyn::syn::PathArguments::AngleBracketed(args) =>
{
let args = args.args.to_token_stream();
if tagged
{
quote!(oxydsp_flowgraph::tag::Tagged<#args>).into_token_stream()
}
else
{
args
}
}
zyn::syn::PathArguments::None => panic!(),
zyn::syn::PathArguments::Parenthesized(_) => panic!(),
}
})
.collect::<Vec<_>>();
zyn::zyn!(
@if (field_types.is_empty())
{
()
} @else if (field_types.len() == 1)
{
{{ field_types[0].clone() }}
} @else
{
(
@for (x in field_types.iter())
{
{{ x }},
}
)
}
)
.into_token_stream()
}
// View struct for sync block
#[zyn::element]
fn sync_block_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream
{
// Create view liftime to add to struct definition
let lifetime: GenericParam = parse_quote!('view);
let mut generics = item.generics.clone();
generics.params.iter_mut().for_each(|x| match x
{
GenericParam::Lifetime(_) =>
{}
GenericParam::Type(type_param) => type_param
.bounds
.push(zyn::syn::TypeParamBound::Lifetime(parse_quote!('view))),
GenericParam::Const(_) =>
{}
});
generics.params.insert(0, lifetime);
let (impl_generics, _type_generics, where_clause) = generics.split_for_impl();
let fields = &item.fields.as_named().unwrap().named;
let mut state_fields = vec![];
for field in fields.iter()
{
let mut f = field.clone();
if f.attrs.iter().any(|x| x.is("input") || x.is("output"))
{
continue;
}
let tk = field.ty.clone().into_token_stream();
f.ty = parse_quote!(&'view mut #tk);
state_fields.push(f);
}
let phantom_types = generics
.params
.iter()
.map(|param| {
match param
{
GenericParam::Type(t) =>
{
let ident = &t.ident;
quote!(#ident)
}
GenericParam::Lifetime(l) =>
{
let lifetime = &l.lifetime;
// Lifetimes need to be wrapped in a reference or similar
quote!(& #lifetime ())
}
GenericParam::Const(_) =>
{
// Const generics generally don't need PhantomData
// as they don't affect variance or drop-check.
quote!()
}
}
})
.filter(|tokens| !tokens.is_empty());
zyn::zyn!(
pub struct {{ item.ident | ident:"{}View" }} {{ impl_generics }}
{{ where_clause }}
{
@for (field in state_fields.iter())
{
pub {{ field }},
}
pub _sync_block_phantom: std::marker::PhantomData<(
@for (ty in phantom_types)
{
{{ ty }},
}
)>,
}
)
}
// Instantiates the view struct
fn sync_block_make_view_struct(item: zyn::syn::ItemStruct) -> zyn::TokenStream
{
let fields = &item.fields.as_named().unwrap().named;
let mut state_fields = vec![];
for field in fields.iter()
{
let mut f = field.clone();
if f.attrs.iter().any(|x| x.is("input") || x.is("output"))
{
continue;
}
let tk = field.ty.clone().into_token_stream();
f.ty = parse_quote!(&'view mut #tk);
state_fields.push(f);
}
zyn::zyn!(
{{item.ident | snake | ident:"{}_synchronous_block" }}::{{ item.ident | ident:"{}View" }} {
@for (field in state_fields)
{
{{field.ident}}: &mut self.{{ field.ident }},
}
_sync_block_phantom: Default::default(),
}
)
.into_token_stream()
}
// Impl Block for syncio
#[zyn::element]
fn sync_block_impl_block(item: zyn::syn::ItemStruct, tagged: bool) -> zyn::TokenStream
{
let item2 = item.clone();
let (impl_generics, type_generics, where_clause) = item2.generics.split_for_impl();
let fields = &item.fields.as_named().unwrap().named;
// Retrieve fields
let input_fields = fields
.iter()
.filter(|f| f.attrs.iter().any(|attr| attr.is("input")))
.cloned()
.collect::<Vec<_>>();
let output_fields = fields
.iter()
.filter(|f| f.attrs.iter().any(|attr| attr.is("output")))
.cloned()
.collect::<Vec<_>>();
zyn::zyn!(
impl {{ impl_generics }} oxydsp_flowgraph::block::Block for {{ item.ident }} {{ type_generics }}
{{ where_clause }}
{
fn work(&mut self) -> oxydsp_flowgraph::block::BlockResult
{
use oxydsp_flowgraph::tag::TagMergable;
// Get writers from outputs
let mut max_len = usize::MAX;
@for (out_field in output_fields.iter())
{
let {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}} = self.{{ out_field.ident }}.write();
}
// Compute max_len
let max_len = *([
usize::MAX,
@for (out_field in output_fields.iter())
{
{{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.len(),
}
].iter().min().unwrap());
@if (!input_fields.is_empty())
{
@sync_block_block_impl_with_inputs(item = item.clone(), input_fields = input_fields.clone(), output_fields = output_fields.clone(), tagged = *tagged)
}
@else
{
@sync_block_block_impl_without_inputs(item = item.clone(), output_fields = output_fields.clone(), tagged = *tagged)
}
oxydsp_flowgraph::block::BlockResult::Ok
}
}
)
}
// Impl Block for syncio (no inputs)
#[zyn::element]
fn sync_block_block_impl_without_inputs(
item: zyn::syn::ItemStruct,
output_fields: Vec<Field>,
tagged: bool,
) -> zyn::TokenStream
{
zyn::zyn!(
for _ in 0..max_len
{
// Get outputs
let state = {{ sync_block_make_view_struct(item.clone()) }};
let @sync_block_output_descons(output_fields = output_fields.clone(), tagged = *tagged) =
<Self as oxydsp_flowgraph::block::SyncBlock>::sync_work(state, ()).unwrap();
// Now the output samples must be sent to their resepective outputs
@for (out_field in output_fields.iter())
{
@if (*tagged)
{
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
oxydsp_flowgraph::tag::Tagged(
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
{{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}},
)
);
} @else
{
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
oxydsp_flowgraph::tag::Tagged(
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
None,
)
);
}
}
}
)
}
// Impl Block for syncio (with inputs)
#[zyn::element]
fn sync_block_block_impl_with_inputs(
item: zyn::syn::ItemStruct,
input_fields: Vec<Field>,
output_fields: Vec<Field>,
tagged: bool,
) -> zyn::TokenStream
{
zyn::zyn!(
use oxydsp_flowgraph::io::PopIterable;
// Iterate on inputs
(
@for (in_field in input_fields.iter())
{
&mut self.{{ in_field.ident }},
}
).pop_iter()
.take(max_len)
.for_each(
// Deconstruct foreach arguments
|
(@for (in_field in input_fields.iter())
{
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}},
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}),
})
|
{
// Create output tag
let common_tag = oxydsp_flowgraph::tag::Tag::merge_tag_opts([
@for (in_field in input_fields.iter())
{
{{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone(),
}
]);
let state = {{ sync_block_make_view_struct(item.clone()) }};
// Compute output sample
let @sync_block_output_descons(output_fields = output_fields.clone(), tagged = *tagged)
= <Self as oxydsp_flowgraph::block::SyncBlock>::sync_work(state, @sync_block_input_cons(input_fields = input_fields.clone(), tagged = *tagged)).unwrap();
// Now the output samples must be sent to their resepective outputs
@for (out_field in output_fields.iter())
{
@if (*tagged)
{
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
oxydsp_flowgraph::tag::Tagged(
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
{{ out_field.ident.clone().unwrap() | ident: "{}_tag_opt"}}.merge(&common_tag),
)
);
} @else
{
let _ = {{ out_field.ident.clone().unwrap() | ident: "{}_writer"}}.push(
oxydsp_flowgraph::tag::Tagged(
{{ out_field.ident.clone().unwrap() | ident: "{}_element"}},
common_tag,
)
);
}
}
//
}
);
)
}
#[zyn::element]
fn sync_block_output_descons(output_fields: Vec<Field>, tagged: bool) -> zyn::TokenStream
{
#[allow(clippy::collapsible_else_if)]
if *tagged
{
// If tagged : deconstruct tags
if output_fields.is_empty()
{
zyn::zyn!(_)
}
else if output_fields.len() == 1
{
zyn::zyn!(
oxydsp_flowgraph::tag::Tagged({{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{output_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}})
)
}
else
{
zyn::zyn!(
(@for (in_field in output_fields.iter())
{
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()),
}
)
)
}
}
else
{
// Otherwise just get the output element
if output_fields.is_empty()
{
zyn::zyn!(_)
}
else if output_fields.len() == 1
{
zyn::zyn!(
{{output_fields[0].ident.clone().unwrap() | ident:"{}_element"}}
)
}
else
{
zyn::zyn!(
(@for (in_field in output_fields.iter())
{
{{in_field.ident.clone().unwrap() | ident:"{}_element"}},
}
)
)
}
}
}
#[zyn::element]
fn sync_block_input_cons(input_fields: Vec<Field>, tagged: bool) -> zyn::TokenStream
{
#[allow(clippy::collapsible_else_if)]
if *tagged
{
// If tagged : deconstruct tags
if input_fields.is_empty()
{
zyn::zyn!(())
}
else if input_fields.len() == 1
{
zyn::zyn!(
oxydsp_flowgraph::tag::Tagged({{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}, {{input_fields[0].ident.clone().unwrap() | ident:"{}_tag_opt"}})
)
}
else
{
zyn::zyn!(
(@for (in_field in input_fields.iter())
{
oxydsp_flowgraph::tag::Tagged({{in_field.ident.clone().unwrap() | ident:"{}_element"}}, {{in_field.ident.clone().unwrap() | ident:"{}_tag_opt"}}.clone()),
}
)
)
}
}
else
{
// Otherwise just get the output element
if input_fields.is_empty()
{
zyn::zyn!(_)
}
else if input_fields.len() == 1
{
zyn::zyn!(
{{input_fields[0].ident.clone().unwrap() | ident:"{}_element"}}
)
}
else
{
zyn::zyn!(
(@for (in_field in input_fields.iter())
{
{{in_field.ident.clone().unwrap() | ident:"{}_element"}},
}
)
)
}
}
}

View File

@ -1,4 +1,6 @@
use crate::edge::{AnonymousStreamConsumer, AnonymousStreamProducer, BlockIOIndex};
use crate::edge::BlockIOIndex;
use crate::io::AnonymousStreamConsumer;
use crate::io::AnonymousStreamProducer;
pub enum BlockResult
{
@ -7,8 +9,10 @@ pub enum BlockResult
// Signifies that the block finished its work
// Running it again would be useless
// This triggers the graph shutdown
Terminated,
// Kill graph
Exit,
}
pub trait BlockIO
@ -46,13 +50,18 @@ pub trait Block
fn work(&mut self) -> BlockResult;
}
pub trait SyncBlock
// Represents the input, output, state types
// that a SyncBlock will have to interacti with
pub trait SyncBlockIO<'view>
{
type StateView;
type Input;
type Output;
type State;
}
fn sync_work(state: &mut Self::State, input: Self::Input) -> Option<Self::Output>;
pub trait SyncBlock<'view>: SyncBlockIO<'view>
{
fn sync_work(state: Self::StateView, input: Self::Input) -> Option<Self::Output>;
}
pub trait GraphableBlock: Block + BlockIO {}

View File

@ -1,16 +1,4 @@
use std::any::Any;
use std::collections::binary_heap::Iter;
use std::sync::Arc;
use std::sync::Mutex;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::stream;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
#[derive(Default)]
pub struct Edge
@ -30,238 +18,3 @@ pub struct BlockIOIndex
pub block_index: usize,
pub port_index: usize,
}
// Needed for graph to be able to manipulate
// stream endings without knowing the generic type
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
}
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
}
impl<T: 'static> From<StreamProducer<T>> for AnonymousStreamProducer
{
fn from(value: StreamProducer<T>) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value),
}
}
}
impl<T: 'static> From<StreamConsumer<T>> for AnonymousStreamConsumer
{
fn from(value: StreamConsumer<T>) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value),
}
}
}
impl AnonymousStreamProducer
{
pub fn downcast<T: 'static>(self) -> StreamProducer<T>
{
*self.inner.downcast::<StreamProducer<T>>().unwrap()
}
}
impl AnonymousStreamConsumer
{
pub fn downcast<T: 'static>(self) -> StreamConsumer<T>
{
*self.inner.downcast::<StreamConsumer<T>>().unwrap()
}
}
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub fn stream<T>() -> (Out<T>, In<T>)
{
let edge = Arc::new(Mutex::new(Edge::default()));
(
Out {
stream: None,
edge: edge.clone(),
},
In { stream: None, edge },
)
}
impl<T: 'static> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
pub fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
self.stream = Some(consumer.downcast::<T>())
}
pub fn read<'a>(&'a mut self) -> StreamReader<'a, T>
{
self.stream.as_mut().unwrap().read()
}
}
impl<T: 'static> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
pub fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
self.stream = Some(producer.downcast::<T>())
}
// Delegate stream creation to Out object
// which knows the stream type
pub fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
(tx.into(), rx.into())
}
pub fn write<'a>(&'a mut self) -> StreamWriter<'a, T>
{
self.stream.as_mut().unwrap().write()
}
pub fn push_iter<I: Iterator<Item = T>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
for _ in 0..len
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
}
else
{
return false;
}
}
true
}
// Meta information
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}
pub struct PopIter<T>
{
len: usize,
popped: usize,
reader: T,
}
pub trait PopIterable<'a>
{
type Output;
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
impl<'a, T: 'static> PopIterable<'a> for In<T>
{
type Output = StreamReader<'a, T>;
fn pop_iter(&'a mut self) -> PopIter<StreamReader<'a, T>>
{
let reader = self.read();
PopIter {
popped: 0,
len: reader.len(),
reader,
}
}
}
generate_pop_iterable_tuple_impl! {2}
generate_pop_iterable_tuple_impl! {3}
generate_pop_iterable_tuple_impl! {4}
generate_pop_iterable_tuple_impl! {5}
generate_pop_iterable_tuple_impl! {6}
generate_pop_iterable_tuple_impl! {7}
generate_pop_iterable_tuple_impl! {8}
generate_pop_iterable_tuple_impl! {9}
generate_pop_iterable_tuple_impl! {10}
generate_pop_iterable_tuple_impl! {11}
generate_pop_iterable_tuple_impl! {12}
generate_pop_iterable_tuple_impl! {13}
generate_pop_iterable_tuple_impl! {14}
generate_pop_iterable_tuple_impl! {15}
generate_pop_iterable_tuple_impl! {16}
generate_pop_iterable_tuple_impl! {17}
generate_pop_iterable_tuple_impl! {18}
generate_pop_iterable_tuple_impl! {19}
generate_pop_iterable_tuple_impl! {20}
impl<'a, T> Iterator for PopIter<StreamReader<'a, T>>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item>
{
self.reader.pop()
}
}
impl_iterator_for_pop_iter_tuple! {2}
impl_iterator_for_pop_iter_tuple! {3}
impl_iterator_for_pop_iter_tuple! {4}
impl_iterator_for_pop_iter_tuple! {5}
impl_iterator_for_pop_iter_tuple! {6}
impl_iterator_for_pop_iter_tuple! {7}
impl_iterator_for_pop_iter_tuple! {8}
impl_iterator_for_pop_iter_tuple! {9}
impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
impl_iterator_for_pop_iter_tuple! {13}
impl_iterator_for_pop_iter_tuple! {14}
impl_iterator_for_pop_iter_tuple! {15}
impl_iterator_for_pop_iter_tuple! {16}
impl_iterator_for_pop_iter_tuple! {17}
impl_iterator_for_pop_iter_tuple! {18}
impl_iterator_for_pop_iter_tuple! {19}
impl_iterator_for_pop_iter_tuple! {20}

View File

@ -0,0 +1,5 @@
// Represents a FlowGrahWide, simultaneous event
pub enum FlowGraphEvent
{
Kill(String),
}

View File

@ -49,20 +49,16 @@ impl FlowGraph
crate::block::BlockResult::Ok =>
{}
crate::block::BlockResult::Terminated =>
{ //break 'outer;
}
crate::block::BlockResult::Exit =>
{
println!("KILLING GRAPH");
break 'outer;
}
}
}
}
for _ in 0..10_000
{
for x in self.blocks.iter_mut()
{
x.work();
}
}
})
}

364
oxydsp-flowgraph/src/io.rs Normal file
View File

@ -0,0 +1,364 @@
use std::any::Any;
use std::sync::Arc;
use std::sync::Mutex;
use oxydsp_flowgraph_macros::generate_pop_iterable_tuple_impl;
use oxydsp_flowgraph_macros::impl_iterator_for_pop_iter_tuple;
use crate::edge::BlockIOIndex;
use crate::edge::Edge;
use crate::stream::StreamConsumer;
use crate::stream::StreamProducer;
use crate::stream::StreamReader;
use crate::stream::StreamWriter;
use crate::stream::{self};
use crate::tag::Tag;
use crate::tag::Tagged;
pub struct In<T>
{
stream: Option<StreamConsumer<T>>,
tag_stream: Option<StreamConsumer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub struct Out<T>
{
stream: Option<StreamProducer<T>>,
tag_stream: Option<StreamProducer<Tag>>,
// Will rarely be accessed
edge: Arc<Mutex<Edge>>,
}
pub struct InReader<'a, T>
{
data_reader: StreamReader<'a, T>,
tag_reader: StreamReader<'a, Tag>,
}
pub struct OutWriter<'a, T>
{
data_writer: StreamWriter<'a, T>,
tag_writer: StreamWriter<'a, Tag>,
}
pub fn stream<T>() -> (Out<T>, In<T>)
{
let edge = Arc::new(Mutex::new(Edge::default()));
(
Out {
stream: None,
tag_stream: None,
edge: edge.clone(),
},
In {
stream: None,
tag_stream: None,
edge,
},
)
}
impl<T: 'static> In<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().to = Some(index);
}
pub fn get_producer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().from
}
pub fn set_anonymous_stream(&mut self, consumer: AnonymousStreamConsumer)
{
let (stream, tag_stream) = consumer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
pub fn read<'a>(&'a mut self) -> InReader<'a, T>
{
let data_reader = self.stream.as_mut().unwrap().read();
let tag_reader = self.tag_stream.as_mut().unwrap().read();
InReader {
data_reader,
tag_reader,
}
}
}
impl<T: 'static> Out<T>
{
pub fn set_block_index(&self, index: BlockIOIndex)
{
self.edge.lock().unwrap().from = Some(index);
}
pub fn get_consumer_block(&self) -> Option<BlockIOIndex>
{
self.edge.lock().unwrap().to
}
pub fn set_anonymous_stream(&mut self, producer: AnonymousStreamProducer)
{
let (stream, tag_stream) = producer.downcast::<T>();
self.stream = Some(stream);
self.tag_stream = Some(tag_stream);
}
// Delegate stream creation to Out object
// which knows the stream type
pub fn create_anonymous_stream(
&self,
capacity: usize,
) -> (AnonymousStreamProducer, AnonymousStreamConsumer)
{
let (tx, rx) = stream::bounded_queue::<T>(capacity);
let (tx_tag, rx_tag) = stream::bounded_queue::<Tag>(capacity);
((tx, tx_tag).into(), (rx, rx_tag).into())
}
pub fn write<'a>(&'a mut self) -> OutWriter<'a, T>
{
OutWriter {
data_writer: self.stream.as_mut().unwrap().write(),
tag_writer: self.tag_stream.as_mut().unwrap().write(),
}
}
pub fn push_iter<I: Iterator<Item = Tagged<T>>>(&mut self, mut iter: I) -> bool
{
let writer = self.write();
let len = writer.len();
for _ in 0..len
{
if let Some(elt) = iter.next()
{
let _ = writer.push(elt);
}
else
{
return false;
}
}
true
}
// Meta information
pub fn get_type_name(&self) -> &'static str
{
std::any::type_name::<T>()
}
}
impl<T> InReader<'_, T>
{
pub fn len(&self) -> usize
{
self.data_reader.len()
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn pop(&self) -> Option<Tagged<T>>
{
let data = self.data_reader.pop_with_index();
if let Some((data, index)) = data
{
let mut tag = None;
if self
.tag_reader
.peek(|t| t.position)
.is_some_and(|x| x == index)
{
tag = self.tag_reader.pop();
}
Some((data, tag).into())
}
else
{
None
}
}
pub fn pop_untag(&self) -> Option<T>
{
self.pop().map(|data| data.into_inner())
}
}
impl<T> OutWriter<'_, T>
{
pub fn len(&self) -> usize
{
self.data_writer.len().min(self.tag_writer.len())
}
pub fn is_empty(&self) -> bool
{
self.len() == 0
}
pub fn push(&self, data: Tagged<T>) -> Result<(), Tagged<T>>
{
let (data, mut tag) = data.into();
let position = self.data_writer.next_index();
if let Some(tag) = &mut tag
{
tag.position = position
}
match self.data_writer.push(data)
{
Ok(_) if tag.is_some() =>
{
let _ = self.tag_writer.push(tag.unwrap());
Ok(())
}
Ok(_) => Ok(()),
Err(data) => Err((data, tag).into()),
}
}
pub fn push_no_tag(&self, data: T) -> Result<(), T>
{
self.data_writer.push(data)
}
}
pub struct PopIter<T>
{
len: usize,
popped: usize,
reader: T,
}
pub trait PopIterable<'a>
{
type Output;
fn pop_iter(&'a mut self) -> PopIter<Self::Output>;
}
impl<'a, T: 'static> PopIterable<'a> for In<T>
{
type Output = InReader<'a, T>;
fn pop_iter(&'a mut self) -> PopIter<InReader<'a, T>>
{
let reader = self.read();
PopIter {
popped: 0,
len: reader.len(),
reader,
}
}
}
generate_pop_iterable_tuple_impl! {1}
generate_pop_iterable_tuple_impl! {2}
generate_pop_iterable_tuple_impl! {3}
generate_pop_iterable_tuple_impl! {4}
generate_pop_iterable_tuple_impl! {5}
generate_pop_iterable_tuple_impl! {6}
generate_pop_iterable_tuple_impl! {7}
generate_pop_iterable_tuple_impl! {8}
generate_pop_iterable_tuple_impl! {9}
generate_pop_iterable_tuple_impl! {10}
generate_pop_iterable_tuple_impl! {11}
generate_pop_iterable_tuple_impl! {12}
impl<'a, T> Iterator for PopIter<InReader<'a, T>>
{
type Item = Tagged<T>;
fn next(&mut self) -> Option<Self::Item>
{
self.reader.pop()
}
}
impl_iterator_for_pop_iter_tuple! {1}
impl_iterator_for_pop_iter_tuple! {2}
impl_iterator_for_pop_iter_tuple! {3}
impl_iterator_for_pop_iter_tuple! {4}
impl_iterator_for_pop_iter_tuple! {5}
impl_iterator_for_pop_iter_tuple! {6}
impl_iterator_for_pop_iter_tuple! {7}
impl_iterator_for_pop_iter_tuple! {8}
impl_iterator_for_pop_iter_tuple! {9}
impl_iterator_for_pop_iter_tuple! {10}
impl_iterator_for_pop_iter_tuple! {11}
impl_iterator_for_pop_iter_tuple! {12}
// Needed for graph to be able to manipulate
// stream endings without knowing the generic type
pub struct AnonymousStreamProducer
{
inner: Box<dyn Any>,
inner_tag: StreamProducer<Tag>,
}
pub struct AnonymousStreamConsumer
{
inner: Box<dyn Any>,
inner_tag: StreamConsumer<Tag>,
}
impl<T: 'static> From<(StreamProducer<T>, StreamProducer<Tag>)> for AnonymousStreamProducer
{
fn from(value: (StreamProducer<T>, StreamProducer<Tag>)) -> Self
{
AnonymousStreamProducer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl<T: 'static> From<(StreamConsumer<T>, StreamConsumer<Tag>)> for AnonymousStreamConsumer
{
fn from(value: (StreamConsumer<T>, StreamConsumer<Tag>)) -> Self
{
AnonymousStreamConsumer {
inner: Box::new(value.0),
inner_tag: value.1,
}
}
}
impl AnonymousStreamProducer
{
pub fn downcast<T: 'static>(self) -> (StreamProducer<T>, StreamProducer<Tag>)
{
(
*self.inner.downcast::<StreamProducer<T>>().unwrap(),
self.inner_tag,
)
}
}
impl AnonymousStreamConsumer
{
pub fn downcast<T: 'static>(self) -> (StreamConsumer<T>, StreamConsumer<Tag>)
{
(
*self.inner.downcast::<StreamConsumer<T>>().unwrap(),
self.inner_tag,
)
}
}
// pub trait PushIterable<'a, T, I>
// where
// I: Iterator<Item = T>,
// {
// fn push_iter(&'a mut self, iter: I) -> bool;
// }

View File

@ -3,6 +3,10 @@
pub mod block;
pub mod edge;
pub mod event;
pub mod graph;
pub mod io;
pub mod stream;
pub use oxydsp_flowgraph_macros::{BlockIO, sync_block};
pub mod tag;
pub use oxydsp_flowgraph_macros::BlockIO;
pub use oxydsp_flowgraph_macros::sync_block;

View File

@ -1,5 +1,6 @@
use std::cell::Cell;
use std::cell::UnsafeCell;
use std::io::empty;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::sync::Arc;
@ -62,6 +63,11 @@ pub struct StreamWriter<'a, T>
first_len: usize,
second_len: usize,
written: Cell<usize>,
// Index of the first element to be pushed
// within the "infinite buffer"
// Used to number tags
start_index: usize,
}
// Represents a read operation within a stream producer
@ -73,6 +79,11 @@ pub struct StreamReader<'a, T>
first_len: usize,
second_len: usize,
read: Cell<usize>,
// Index of the first element to be read
// within the "infinite buffer"
// Used to number tags
start_index: usize,
}
pub fn bounded_queue<T>(capacity: usize) -> (StreamProducer<T>, StreamConsumer<T>)
@ -168,6 +179,8 @@ impl<T> StreamProducer<T>
&UnsafeCell<[MaybeUninit<T>]>,
>(start_to_tail));
StreamWriter {
start_index: head,
producer: self,
first,
second,
@ -224,6 +237,8 @@ impl<T> StreamProducer<T>
>(start_to_tail));
StreamWriter {
start_index: head,
producer: self,
first,
second,
@ -258,9 +273,12 @@ impl<T> StreamProducer<T>
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_tail - wrapped_head;
StreamWriter {
start_index: head,
producer: self,
first_len: wrapped_tail - wrapped_head,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
@ -294,9 +312,12 @@ impl<T> StreamConsumer<T>
// Buffer is empty. Return empty slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
StreamReader {
start_index: tail,
producer: self,
first_len: wrapped_head - wrapped_tail,
first_len: len,
second_len: 0,
first: std::mem::transmute::<&[MaybeUninit<T>], &UnsafeCell<[MaybeUninit<T>]>>(
&k[wrapped_tail..wrapped_head],
@ -327,9 +348,12 @@ impl<T> StreamConsumer<T>
// Head and tail are both indices of the slice
unsafe {
let k = &mut *self.inner.buffer.get();
let len = wrapped_head - wrapped_tail;
StreamReader {
start_index: tail,
producer: self,
first_len: wrapped_head - wrapped_tail,
first_len: len,
second_len: 0,
first: std::mem::transmute::<
&[MaybeUninit<T>],
@ -385,6 +409,8 @@ impl<T> StreamConsumer<T>
>(start_to_head));
StreamReader {
start_index: tail,
producer: self,
first,
second,
@ -396,6 +422,90 @@ impl<T> StreamConsumer<T>
}
}
}
/// Creates a reader of contiguous elements that
/// satisfy the predicate
pub fn read_while<F>(&mut self, predicate: F) -> StreamReader<'_, T>
where
F: Fn(&T) -> bool,
{
// Take a normal reader. This contains available elements to read.
let mut reader = self.read();
// We need to trim the slices to keep only the satified elements
// First slice
let mut first_kept = 0;
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*reader.first.get() }
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Forget about second slice
reader.second_len = 0;
reader.second = None;
// Trim first slice
reader.first_len = first_kept;
unsafe {
reader.first = std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(&(&*reader.first.get())[0..first_kept]);
}
return reader;
}
first_kept += 1;
}
// If we are here, all of the elements of the first slice, satisfy the predicate
if let Some(second_slice) = &mut reader.second
{
// Second slice
let mut second_kept = 0;
// SAFETY:
//
// Only us can have a reference to these slices of the buffer
for element in unsafe { &*second_slice.get() }
{
// SAFETY
//
// If this element is in a reader returned by self.read
// with no pop called, we know it is initialized
let init_element = unsafe { element.assume_init_ref() };
let sat = predicate(init_element);
if !sat
{
// Stop here
// Trim second slice
reader.second_len = second_kept;
unsafe {
reader.second = Some(std::mem::transmute::<
&[MaybeUninit<T>],
&UnsafeCell<[MaybeUninit<T>]>,
>(
&(&*second_slice.get())[0..first_kept]
));
}
return reader;
}
second_kept += 1;
}
}
return reader;
}
}
impl<'a, T> StreamWriter<'a, T>
@ -410,6 +520,11 @@ impl<'a, T> StreamWriter<'a, T>
self.len() == 0
}
pub fn next_index(&self) -> usize
{
self.start_index + self.written.get()
}
pub fn push(&self, element: T) -> Result<(), T>
{
if self.written.get() < self.first_len
@ -449,6 +564,49 @@ impl<'a, T> StreamReader<'a, T>
self.len() == 0
}
pub fn last_index(&self) -> usize
{
self.start_index + self.len()
}
pub fn next_index(&self) -> usize
{
self.start_index + self.read.get()
}
pub fn pop_with_index(&self) -> Option<(T, usize)>
{
let index = self.next_index();
self.pop().map(|t| (t, index))
}
pub fn peek<F, O>(&self, peeker: F) -> Option<O>
where
F: Fn(&T) -> O,
{
// Same as pop, without taking, or increasing read count
if self.read.get() < self.first_len
{
// SAFETY:
//
// If element is in this slice, it is initialized.
// We take it once since read increases
let element = unsafe { (&mut *self.first.get())[self.read.get()].assume_init_ref() };
Some(peeker(element))
}
else if let Some(second) = &self.second
&& self.read.get() - self.first_len < self.second_len
{
let element =
unsafe { (&mut *second.get())[self.read.get() - self.first_len].assume_init_ref() };
Some(peeker(element))
}
else
{
None
}
}
pub fn pop(&self) -> Option<T>
{
if self.read.get() < self.first_len

216
oxydsp-flowgraph/src/tag.rs Normal file
View File

@ -0,0 +1,216 @@
use std::any::Any;
use std::collections::HashMap;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::Mutex;
// Tags a particular sample within a specific stream
#[derive(Clone)]
pub struct Tag
{
// Position of the sample this tag is tied to.
// The position is in terms of the stream front index when the
// sample was added
pub position: usize,
// TODO: Make it such that when a tag is duplicated, the data seems to be too:
// When adding on a duplicate, it should not replicate on others, but without
// requiring a deep copy.
pub data: Arc<Mutex<HashMap<String, Arc<dyn Any + Send + Sync>>>>,
}
impl Tag
{
pub fn new() -> Self
{
Self {
position: 0,
data: Default::default(),
}
}
pub fn merge_tag_opts<const N: usize>(tag_opts: [Option<Tag>; N]) -> Option<Tag>
{
let mut out_tag = None;
for tag in tag_opts.iter()
{
out_tag = out_tag.merge(tag);
}
out_tag
}
pub fn tag<T: 'static + Send + Sync>(&self, key: impl AsRef<str>, value: T)
{
self.data
.lock()
.unwrap()
.insert(key.as_ref().to_owned(), Arc::new(value));
}
pub fn retrieve(&self, key: impl AsRef<str>) -> Option<Arc<dyn Any + Send + Sync>>
{
self.data.lock().unwrap().get(key.as_ref()).cloned()
}
}
impl Default for Tag
{
fn default() -> Self
{
Self::new()
}
}
pub trait TagValue: Clone {}
impl<T> TagValue for T where T: Clone {}
pub trait TagMergable<T>
{
fn merge(&self, other: &T) -> Self;
}
impl TagMergable<Tag> for Tag
{
fn merge(&self, other: &Self) -> Self
{
// TODO: More performant merge
let mut new = other.clone();
new.position = self.position;
{
let mut data_locked = new.data.lock().unwrap();
for (k, v) in self.data.lock().unwrap().iter()
{
data_locked.insert(k.clone(), v.clone());
}
}
new
}
}
impl TagMergable<Option<Tag>> for Option<Tag>
{
fn merge(&self, other: &Self) -> Self
{
match self
{
Some(first) => match other
{
Some(other) => Some(first.merge(other)),
None => Some(first.clone()),
},
None => other.clone(),
}
}
}
impl<T: Clone> TagMergable<Option<Tag>> for Tagged<T>
{
fn merge(&self, other: &Option<Tag>) -> Self
{
Tagged::new(self.0.clone(), self.1.merge(other))
}
}
/// Represents a data, with a potential tag attached to it.
#[derive(Clone)]
pub struct Tagged<T>(pub T, pub Option<Tag>);
impl<T> Tagged<T>
{
pub fn new(inner: T, tag: Option<Tag>) -> Self
{
if tag.is_none()
{
//println!("data has no tag");
}
Self(inner, tag)
}
pub fn has_tag(&self) -> bool
{
self.1.is_some()
}
pub fn strip(&mut self)
{
self.1 = None;
}
pub fn into_inner(self) -> T
{
self.0
}
pub fn tag(&mut self, tag: Tag) -> Option<Tag>
{
let t = self.1.take();
self.1 = Some(tag);
t
}
}
impl<T: Clone> Tagged<T>
{
pub fn stripped(&self) -> Self
{
self.0.clone().into()
}
pub fn tagged(&self, tag: Tag) -> Self
{
(self.0.clone(), tag).into()
}
}
impl<T> From<T> for Tagged<T>
{
fn from(value: T) -> Self
{
Self::new(value, None)
}
}
impl<T> From<(T, Tag)> for Tagged<T>
{
fn from((value, tag): (T, Tag)) -> Self
{
Self::new(value, Some(tag))
}
}
impl<T> From<(T, Option<Tag>)> for Tagged<T>
{
fn from((value, tag): (T, Option<Tag>)) -> Self
{
Self::new(value, tag)
}
}
impl<T> From<Tagged<T>> for (T, Option<Tag>)
{
fn from(val: Tagged<T>) -> Self
{
(val.0, val.1)
}
}
impl<T> DerefMut for Tagged<T>
{
fn deref_mut(&mut self) -> &mut Self::Target
{
&mut self.0
}
}
impl<T> Deref for Tagged<T>
{
type Target = T;
fn deref(&self) -> &Self::Target
{
&self.0
}
}