UDP reception

This commit is contained in:
2025-10-09 17:56:51 +02:00
parent 391c5e5c7c
commit ebf6814651
4 changed files with 222 additions and 717 deletions

View File

@ -5,19 +5,27 @@ mod complex;
pub mod fft;
mod filtering;
mod iq;
mod math;
mod nco;
mod ted;
mod units;
mod windows;
mod ted;
mod math;
use rand::{rand_core::le, seq::index::sample};
use std::{collections::VecDeque, time::Duration};
use tokio::{join, select, time::timeout};
use tokio::{join, net::UdpSocket, select, time::timeout};
use crate::{
bfsk::BFSKMod,
complex::Complex32,
filtering::{dc_block::DCBlocker, fir::FIRFilter},
iq::IQSampler,
nco::Nco,
ted::elg::ELGate,
units::frequency::hz_to_rad_per_sample,
};
use eframe::{egui, glow::SAMPLE_MASK_VALUE};
use tokio::sync::mpsc::{Receiver, Sender, channel};
use crate::{bfsk::BFSKMod, complex::Complex32, filtering::{dc_block::DCBlocker, fir::FIRFilter}, iq::IQSampler, nco::Nco, ted::elg::ELGate, units::frequency::hz_to_rad_per_sample};
const BAUD_RATE: u32 = 1000;
const SAMPLE_RATE: u32 = 48000;
@ -26,112 +34,120 @@ const SAMPLE_RATE: u32 = 48000;
const CENTER_FREQ: f32 = 1700.;
const DEVIATION: f32 = 500.;
pub trait SampleSender
{
pub trait SampleSender {
fn open_link(&mut self);
fn send_samples(&mut self, samples: &[f32]);
fn close_link(&mut self);
}
struct Transceiver
{
data_receiver: Receiver<Vec<u8>>,
data_sender: Sender<Vec<u8>>,
samples_sender: Sender<f32>,
}
struct Transceiver {}
impl Transceiver
{
pub async fn start<T: SampleSender>(sample_sender: T) -> Self
{
let (transmitter_tx, transmitter_rx) = channel::<Vec<u8>>(4096);
let (acknowledged_tx, acknowledged_rx) = channel::<()>(32);
let (ack_tx, ack_rx) = channel::<()>(32);
let (samples_tx, samples_rx) = channel::<f32>(4096);
let (receiver_tx, receiver_rx) = channel::<Vec<u8>>(4096);
join!(
Self::transmitter(acknowledged_rx, transmitter_rx, ack_rx, sample_sender),
Self::receiver(acknowledged_tx, samples_rx, receiver_tx, ack_tx)
);
Self
{
data_receiver: receiver_rx,
data_sender: transmitter_tx,
samples_sender: samples_tx
}
}
async fn transmitter<T: SampleSender>(mut acknowledged: Receiver<()>, mut data_receiver: Receiver<Vec<u8>>, mut ack_receiver: Receiver<()>, mut samples_sender: T)
{
let mut send_queue: VecDeque<Frame> = VecDeque::new();
loop
{
if !send_queue.is_empty()
{
let to_send = send_queue.pop_front().unwrap();
// Create modulation
let bytes = to_send.bytes();
let mut bit_stream = bytes.iter().flat_map(|x| byte_to_bits(*x));
let mut modulator = BFSKMod::new((SAMPLE_RATE as f32 / BAUD_RATE as f32).round() as u32,
hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32),
&mut bit_stream);
let mut up_lo = Nco::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32));
let mut sample_buffer = vec![];
for (m, up) in modulator.zip(up_lo)
impl Transceiver {
pub async fn start<T: SampleSender>(
mut sample_stream: Receiver<f32>,
mut tx_stream: Receiver<Vec<u8>>,
mut rx_stream: Sender<Vec<u8>>,
sample_sender: &mut T,
) {
let mut resend: Option<Vec<u8>> = None;
loop {
select! {
_ = Self::squelch_detector(&mut sample_stream) =>
{
let sample = m * up;
sample_buffer.push(sample.re); // Project IQ
}
samples_sender.open_link();
samples_sender.send_samples(&sample_buffer);
samples_sender.close_link();
if let Frame::Data(_) = to_send
{
// Wait for ack
while !acknowledged.is_empty()
println!("Squelch UP");
select!
{
let _ = acknowledged.blocking_recv();
x = Self::receive(&mut sample_stream) =>
{
match x
{
Err(()) => {continue;},
Ok(Frame::Ack) =>
{
resend = None;
}
Ok(Frame::Data(data)) =>
{
rx_stream.send(data).await.unwrap();
Self::transmit(Frame::Ack, sample_sender).await;
}
}
},
_ = tokio::time::sleep(Duration::from_secs(2)) => {continue;}, //TODO: 65
//sec
//timeout
}
let ack_timout = timeout(Duration::from_secs(2), acknowledged.recv()).await;
if let Ok(Some(())) = ack_timout
}, // End squelch
data_opt = async
{
tokio::time::sleep(Duration::from_secs(2)).await;
if let Some(resend_data) = resend.clone()
{
// ACK Received : Ok
Some(resend_data)
}
else
{
// Try again
send_queue.push_front(to_send);
tx_stream.recv().await
}
}
}
else
{
let new = select!
=>
{
Some(x) = data_receiver.recv() => Frame::Data(x),
Some(()) = ack_receiver.recv() => Frame::Ack,
};
match new
{
Frame::Ack => send_queue.push_front(Frame::Ack), // Highest importance
Frame::Data(x) => send_queue.push_back(Frame::Data(x))
if let Some(data) = data_opt
{
Self::transmit(Frame::Data(data.clone()), sample_sender).await;
resend = Some(data);
}
}
}
}
}
async fn receiver(acknowledged: Sender<()>, mut samples: Receiver<f32>, data_sender: Sender<Vec<u8>>, ack_sender: Sender<()>)
{
async fn squelch_detector(sample_stream: &mut Receiver<f32>) {
let length = 500;
let level = 0.01;
let mut iq_sampler = IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32));
let mut squelch_sum = 0.;
let mut i = 0;
while let Some(smpl) = sample_stream.recv().await {
println!("sdkf");
let iq = iq_sampler.sample(smpl);
squelch_sum += iq.mag() / length as f32;
i += 1;
if i >= length {
if squelch_sum >= level {
return;
}
} else {
i = 0;
squelch_sum = 0.;
}
}
}
async fn transmit<T: SampleSender>(frame: Frame, samples_sender: &mut T) {
let bytes = frame.bytes();
let mut bit_stream = bytes.iter().flat_map(|x| byte_to_bits(*x));
let modulator = BFSKMod::new(
(SAMPLE_RATE as f32 / BAUD_RATE as f32).round() as u32,
hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32),
&mut bit_stream,
);
let up_lo = Nco::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32));
let mut sample_buffer = vec![];
for (m, up) in modulator.zip(up_lo) {
let sample = m * up;
sample_buffer.push(sample.re); // Project IQ
}
samples_sender.open_link();
samples_sender.send_samples(&sample_buffer);
samples_sender.close_link();
}
async fn receive(sample_stream: &mut Receiver<f32>) -> Result<Frame, FrameConstructionError> {
let mut iq_sampler = IQSampler::new(hz_to_rad_per_sample(CENTER_FREQ, SAMPLE_RATE as f32));
let samples_per_symbol = (SAMPLE_RATE as f32) / (BAUD_RATE as f32);
@ -139,12 +155,19 @@ impl Transceiver
let correllator_length = samples_per_symbol as usize;
let mut pos_nco = Nco::new(hz_to_rad_per_sample(DEVIATION, SAMPLE_RATE as f32));
let mut neg_nco = Nco::new(hz_to_rad_per_sample(-DEVIATION, SAMPLE_RATE as f32));
let pos_ir = (0..correllator_length).map(|_| {pos_nco.step(); pos_nco.cexp()});
let neg_ir = (0..correllator_length).map(|_| {neg_nco.step(); neg_nco.cexp()});
let pos_ir = (0..correllator_length).map(|_| {
pos_nco.step();
pos_nco.cexp()
});
let neg_ir = (0..correllator_length).map(|_| {
neg_nco.step();
neg_nco.cexp()
});
let mut pos_correllator = FIRFilter::new(&pos_ir.collect::<Vec<_>>());
let mut neg_correllator = FIRFilter::new(&neg_ir.collect::<Vec<_>>());
let mut matched_lowpass = FIRFilter::new(&vec![Complex32::new(1., 0.); samples_per_symbol as usize]);
let mut matched_lowpass =
FIRFilter::new(&vec![Complex32::new(1., 0.); samples_per_symbol as usize]);
let mut dc_block = DCBlocker::new(0.995);
let loop_i = 0.1;
@ -157,117 +180,100 @@ impl Transceiver
let mut last_byte = 0x00u8;
let mut frame_constructor = FrameConstructor::new();
let mut bit_count: Option<u32> = None;
while let Some(sample) = samples.recv().await
{
let iq = iq_sampler.sample(sample);
let matched = dc_block.next_real(matched_lowpass.next_real(pos_correllator.next(iq).mag() - neg_correllator.next(iq).mag()));
if let Some(bit_sample) = elg.next(matched)
{
last_byte <<= 1;
while let Some(sample) = sample_stream.recv().await {
let iq = iq_sampler.sample(sample);
let matched =
dc_block
.next_real(matched_lowpass.next_real(
pos_correllator.next(iq).mag() - neg_correllator.next(iq).mag(),
));
if let Some(bit_sample) = elg.next(matched) {
last_byte <<= 1;
last_byte |= (bit_sample > 0.) as u8;
bit_count = bit_count.map(|x| x + 1);
if last_byte == 0xD8 // Potential frame starts
if last_byte == 0xD8
// Potential frame starts
{
last_byte = 0;
frame_constructor = FrameConstructor::new();
bit_count = Some(0);
}
if let Some(8) = bit_count
{
if let Some(8) = bit_count {
let frame_opt = frame_constructor.add_byte(last_byte);
if let Ok(None) = frame_opt
{
bit_count = Some(0);
bit_count = Some(0);
if let Ok(Some(Frame::Ack)) = frame_opt {
return Ok(Frame::Ack);
}
if let Ok(Some(Frame::Ack)) = frame_opt
{
bit_count = None;
acknowledged.send(()).await.unwrap(); // Send acknowledgement to transmitter
if let Ok(Some(Frame::Data(ref frame_data))) = frame_opt {
return Ok(Frame::Data(frame_data.to_vec()));
}
if let Ok(Some(Frame::Data(ref frame_data))) = frame_opt
{
bit_count = None;
data_sender.send(frame_data.to_vec()).await.unwrap();
ack_sender.send(()).await.unwrap();
}
if let Err(()) = frame_opt
{
bit_count = None; // Erroneous frame, ignore
if let Err(()) = frame_opt {
// Erroneous frame
return Err(());
}
}
}
}
return Err(());
}
}
enum Frame
{
enum Frame {
Data(Vec<u8>),
Ack
Ack,
}
type FrameConstructionError = ();
pub struct FrameConstructor
{
pub struct FrameConstructor {
frame: Vec<u8>,
frame_countdown: Option<u16>,
checksum: u8,
}
impl FrameConstructor
{
pub fn new() -> Self
{
Self
{
impl FrameConstructor {
pub fn new() -> Self {
Self {
frame: Vec::new(),
frame_countdown: None,
checksum: 0u8,
}
}
pub fn add_byte(&mut self, byte: u8) -> Result<Option<Frame>, FrameConstructionError>
{
if self.frame.is_empty() && byte != 0xC4 && byte != 0x4C
{
return Err(());
pub fn add_byte(&mut self, byte: u8) -> Result<Option<Frame>, FrameConstructionError> {
if self.frame.is_empty() && byte != 0xC4 && byte != 0x4C {
return Err(());
}
if self.frame.is_empty() && byte == 0xC4
{
if self.frame.is_empty() && byte == 0xC4 {
return Ok(Some(Frame::Ack));
}
if self.frame.is_empty() && byte == 0x4C
{
if self.frame.is_empty() && byte == 0x4C {
return Ok(None);
}
self.frame.push(byte);
// Retrieve length
if self.frame.len() == 1
{
if self.frame.len() == 1 {
self.frame_countdown = Some(self.frame[0] as u16);
return Ok(None);
}
if self.frame.len() == 2
{
if self.frame.len() == 2 {
*self.frame_countdown.as_mut().unwrap() |= (self.frame[1] as u16) << 8;
return Ok(None);
}
if self.frame_countdown.unwrap() == 0
{
if self.frame_countdown.unwrap() == 0 {
// All data has been received
if self.checksum == byte
{
return Ok(Some(Frame::Data(self.frame.iter().skip(2).copied().collect())));
if self.checksum == byte {
return Ok(Some(Frame::Data(
self.frame.iter().skip(2).copied().collect(),
)));
}
return Err(());
@ -281,12 +287,10 @@ impl FrameConstructor
}
}
impl Frame
{
pub fn bytes(&self) -> Vec<u8>
{
impl Frame {
pub fn bytes(&self) -> Vec<u8> {
let mut output_bytes = vec![];
// Initial training sequence
output_bytes.append(&mut vec![0b01010101; 64]);
@ -294,13 +298,11 @@ impl Frame
output_bytes.push(0xD8);
// Command
match self
{
Frame::Data(x) =>
{
match self {
Frame::Data(x) => {
let mut checksum = 0u8;
x.iter().for_each(|x| checksum ^= x);
assert!(x.len() < 65536, "Data size over MTU");
let len_u16 = x.len() as u16;
output_bytes.push(0x4C); // DATA FRAME
@ -311,8 +313,7 @@ impl Frame
output_bytes.push(checksum);
}
Frame::Ack =>
{
Frame::Ack => {
output_bytes.push(0xC4); // ACK FRAME
}
}
@ -323,7 +324,8 @@ impl Frame
}
}
fn main() {
#[tokio::main]
async fn main() {
let native_options = eframe::NativeOptions::default();
let _ = eframe::run_native(
"Egui",
@ -332,26 +334,58 @@ fn main() {
);
}
//#[derive(Default)]
struct EguiApp {
struct EguiApp {}
struct DummySampleSender();
impl SampleSender for DummySampleSender {
fn open_link(&mut self) {}
fn send_samples(&mut self, samples: &[f32]) {}
fn close_link(&mut self) {}
}
impl EguiApp {
fn new(
_cc: &eframe::CreationContext<'_>,
) -> Self {
fn new(_cc: &eframe::CreationContext<'_>) -> Self {
let (sample_tx, sample_rx) = channel::<f32>(1024);
Self {
let (transmit_tx, transmit_rx) = channel::<Vec<u8>>(1024);
let (receive_tx, receive_rx) = channel::<Vec<u8>>(1024);
}
tokio::spawn(async move {
Transceiver::start(sample_rx, transmit_rx, receive_tx, &mut DummySampleSender()).await;
});
tokio::spawn(async move {
let sock = UdpSocket::bind("0.0.0.0:8080").await.unwrap();
let mut buf = [0u8; 1024];
let mut sample = 0i16;
let mut byte_index = 0;
loop {
let len = sock.recv(&mut buf).await.unwrap();
for x in buf.iter().take(len) {
sample |= (*x as i16) << (byte_index * 8);
byte_index += 1;
if byte_index >= 2 {
sample_tx
.send(sample as f32 / i16::MAX as f32)
.await
.unwrap();
byte_index = 0;
sample = 0i16;
}
}
}
});
Self {}
}
}
impl eframe::App for EguiApp {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
egui::CentralPanel::default().show(ctx, |_ui| {
});
egui::CentralPanel::default().show(ctx, |_ui| {});
}
}