Idk, i'll proly rebase

This commit is contained in:
2026-03-25 21:17:22 +01:00
parent b57b85f959
commit 4d548a7973
4 changed files with 174 additions and 131 deletions

View File

@ -1,3 +1,5 @@
use std::iter::FusedIterator;
use oxydsp_flowgraph::BlockIO;
use oxydsp_flowgraph::block::Block;
use oxydsp_flowgraph::block::BlockResult;
@ -395,3 +397,106 @@ impl<T: 'static + Clone> Block for Tee<T>
BlockResult::Ok
}
}
#[derive(BlockIO)]
pub struct FlatMap<I, O, F>
where
I: 'static,
O: IntoIterator + 'static,
O::IntoIter: FusedIterator,
F: Fn(I) -> O,
{
#[input]
input: In<I>,
#[output]
output: Out<O::Item>,
current_iter: Option<O::IntoIter>,
map: F,
}
impl<I, O, F> FlatMap<I, O, F>
where
I: 'static,
O: IntoIterator + 'static,
O::IntoIter: FusedIterator,
F: Fn(I) -> O,
{
pub fn new(input: In<I>, map: F) -> (Self, In<O::Item>)
{
let (output, port) = oxydsp_flowgraph::io::stream();
(
Self {
input,
output,
current_iter: None,
map,
},
port,
)
}
}
impl<I, O, F> Block for FlatMap<I, O, F>
where
I: 'static,
O: IntoIterator + 'static,
O::IntoIter: FusedIterator,
F: Fn(I) -> O,
{
fn work(&mut self) -> BlockResult
{
let writer = self.output.write();
let reader = self.input.read();
let max_write = writer.len();
let mut written = 0;
while written < max_write
{
if let Some(current_iter) = self.current_iter.as_mut()
{
if let Some(next_elt) = current_iter.next()
{
let _ = writer.push((next_elt, None).into());
written += 1;
continue;
}
else
{
// Iterator empty
self.current_iter = None;
}
}
if self.current_iter.is_none()
{
// Get input
if let Some(input) = reader.pop()
{
let mut new_iter = (self.map)(input.0).into_iter();
if let Some(first_elt) = new_iter.next()
{
self.current_iter = Some(new_iter);
let _ = writer.push((first_elt, input.1).into());
written += 1;
}
else
{
// Iterator empty
self.current_iter = None;
continue;
}
}
else
{
// Cannot continue
break;
}
}
}
BlockResult::Ok
}
}