Skip to main content

hydro_lang/live_collections/sliced/
mod.rs

1//! Utilities for transforming live collections via slicing.
2
3pub mod style;
4
5use super::boundedness::{Bounded, Unbounded};
6use super::stream::{Ordering, Retries};
7use crate::location::{Location, NoTick, Tick};
8
9#[doc(hidden)]
10#[macro_export]
11macro_rules! __sliced_parse_uses__ {
12    // Parse immutable use statements with style: let name = use::style(args...);
13    (
14        @uses [$($uses:tt)*]
15        @states [$($states:tt)*]
16        let $name:ident = use:: $invocation:expr; $($rest:tt)*
17    ) => {
18        $crate::__sliced_parse_uses__!(
19            @uses [$($uses)* { $name, $invocation, $invocation }]
20            @states [$($states)*]
21            $($rest)*
22        )
23    };
24
25    // Parse immutable use statements without style: let name = use(args...);
26    (
27        @uses [$($uses:tt)*]
28        @states [$($states:tt)*]
29        let $name:ident = use($($args:expr),* $(,)?); $($rest:tt)*
30    ) => {
31        $crate::__sliced_parse_uses__!(
32            @uses [$($uses)* { $name, $crate::macro_support::copy_span::copy_span!($($args,)* default)($($args),*), $($args),* }]
33            @states [$($states)*]
34            $($rest)*
35        )
36    };
37
38    // Parse mutable state statements: let mut name = use::style::<Type>(args);
39    (
40        @uses [$($uses:tt)*]
41        @states [$($states:tt)*]
42        let mut $name:ident = use:: $style:ident $(::<$ty:ty>)? ($($args:expr)?); $($rest:tt)*
43    ) => {
44        $crate::__sliced_parse_uses__!(
45            @uses [$($uses)*]
46            @states [$($states)* { $name, $style, (($($ty)?), ($($args)?)) }]
47            $($rest)*
48        )
49    };
50
51    // Terminal case: no uses, only states
52    (
53        @uses []
54        @states [$({ $state_name:ident, $state_style:ident, $state_arg:tt })+]
55        $($body:tt)*
56    ) => {
57        {
58            // We need at least one use to get a tick, so panic if there are none
59            compile_error!("sliced! requires at least one `let name = use(...)` statement to determine the tick")
60        }
61    };
62
63    // Terminal case: uses with optional states
64    (
65        @uses [$({ $use_name:ident, $invocation:expr, $($invocation_spans:expr),* })+]
66        @states [$({ $state_name:ident, $state_style:ident, (($($state_ty:ty)?), ($($state_arg:expr)?)) })*]
67        $($body:tt)*
68    ) => {
69        {
70            use $crate::live_collections::sliced::style::*;
71            let __styled = (
72                $($invocation,)+
73            );
74
75            let __tick = $crate::live_collections::sliced::Slicable::create_tick(&__styled.0);
76            let __backtraces = {
77                use $crate::compile::ir::backtrace::__macro_get_backtrace;
78                (
79                    $($crate::macro_support::copy_span::copy_span!($($invocation_spans,)* {
80                        __macro_get_backtrace(1)
81                    }),)+
82                )
83            };
84            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces);
85            let (
86                $($use_name,)+
87            ) = __sliced;
88
89            // Create all cycles and pack handles/values into tuples
90            let (__handles, __states) = $crate::live_collections::sliced::unzip_cycles((
91                $($crate::live_collections::sliced::style::$state_style$(::<$state_ty, _>)?(& __tick, $($state_arg)?),)*
92            ));
93
94            // Unpack mutable state values
95            let (
96                $(mut $state_name,)*
97            ) = __states;
98
99            // Execute the body
100            let __body_result = {
101                $($body)*
102            };
103
104            // Re-pack the final state values and complete cycles
105            let __final_states = (
106                $($state_name,)*
107            );
108            $crate::live_collections::sliced::complete_cycles(__handles, __final_states);
109
110            // Unslice the result
111            $crate::live_collections::sliced::Unslicable::unslice(__body_result)
112        }
113    };
114}
115
116#[macro_export]
117/// Transforms a live collection with a computation relying on a slice of another live collection.
118/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
119/// collection, such as joining a stream with the latest values from a singleton.
120///
121/// # Syntax
122/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
123/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
124/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
125/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
126///
127/// ```rust,ignore
128/// let stream = sliced! {
129///     let name1 = use(collection1, nondet!(/** explanation */));
130///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
131///
132///     // arbitrary statements can follow
133///     let intermediate = name1.map(...);
134///     intermediate.cross_singleton(name2)
135/// };
136/// ```
137///
138/// # Stateful Computations
139/// The `sliced!` macro also supports stateful computations across iterations using `let mut` bindings
140/// with `use::state` or `use::state_null`. These create cycles that persist values between iterations.
141///
142/// - `use::state(|l| initial)`: Creates a cycle with an initial value. The closure receives
143///   the slice location and returns the initial state for the first iteration.
144/// - `use::state_null::<Type>()`: Creates a cycle that starts as null/empty on the first iteration.
145///
146/// The mutable binding can be reassigned in the body, and the final value will be passed to the
147/// next iteration.
148///
149/// ```rust,ignore
150/// let counter_stream = sliced! {
151///     let batch = use(input_stream, nondet!(/** explanation */));
152///     let mut counter = use::state(|l| l.singleton(q!(0)));
153///
154///     // Increment counter by the number of items in this batch
155///     let new_count = counter.clone().zip(batch.count())
156///         .map(q!(|(old, add)| old + add));
157///     counter = new_count.clone();
158///     new_count.into_stream()
159/// };
160/// ```
161macro_rules! __sliced__ {
162    ($($tt:tt)*) => {
163        $crate::__sliced_parse_uses__!(
164            @uses []
165            @states []
166            $($tt)*
167        )
168    };
169}
170
171pub use crate::__sliced__ as sliced;
172
173/// Marks this live collection as atomically-yielded, which means that the output outside
174/// `sliced` will be at an atomic location that is synchronous with respect to the body
175/// of the slice.
176pub fn yield_atomic<T>(t: T) -> style::Atomic<T> {
177    style::Atomic {
178        collection: t,
179        // yield_atomic doesn't need a nondet since it's for output, not input
180        nondet: crate::nondet::NonDet,
181    }
182}
183
184/// A trait for live collections which can be sliced into bounded versions at a tick.
185pub trait Slicable<'a, L: Location<'a>> {
186    /// The sliced version of this live collection.
187    type Slice;
188
189    /// The type of backtrace associated with this slice.
190    type Backtrace;
191
192    /// Gets the location associated with this live collection.
193    fn get_location(&self) -> &L;
194
195    /// Creates a tick that is appropriate for the collection's location.
196    fn create_tick(&self) -> Tick<L>
197    where
198        L: NoTick,
199    {
200        self.get_location().tick()
201    }
202
203    /// Slices this live collection at the given tick.
204    ///
205    /// # Non-Determinism
206    /// Slicing a live collection may involve non-determinism, such as choosing which messages
207    /// to include in a batch.
208    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice;
209}
210
211/// A trait for live collections which can be yielded out of a slice back into their original form.
212pub trait Unslicable {
213    /// The unsliced version of this live collection.
214    type Unsliced;
215
216    /// Unslices a sliced live collection back into its original form.
217    fn unslice(self) -> Self::Unsliced;
218}
219
220/// A trait for unzipping a tuple of (handle, state) pairs into separate tuples.
221#[doc(hidden)]
222pub trait UnzipCycles {
223    /// The tuple of cycle handles.
224    type Handles;
225    /// The tuple of state values.
226    type States;
227
228    /// Unzips the cycles into handles and states.
229    fn unzip(self) -> (Self::Handles, Self::States);
230}
231
232/// Unzips a tuple of cycles into handles and states.
233#[doc(hidden)]
234pub fn unzip_cycles<T: UnzipCycles>(cycles: T) -> (T::Handles, T::States) {
235    cycles.unzip()
236}
237
238/// A trait for completing a tuple of cycle handles with their final state values.
239#[doc(hidden)]
240pub trait CompleteCycles<States> {
241    /// Completes all cycles with the provided state values.
242    fn complete(self, states: States);
243}
244
245/// Completes a tuple of cycle handles with their final state values.
246#[doc(hidden)]
247pub fn complete_cycles<H: CompleteCycles<S>, S>(handles: H, states: S) {
248    handles.complete(states);
249}
250
251impl<'a, L: Location<'a>> Slicable<'a, L> for () {
252    type Slice = ();
253    type Backtrace = ();
254
255    fn get_location(&self) -> &L {
256        unreachable!()
257    }
258
259    fn slice(self, _tick: &Tick<L>, _backtrace: Self::Backtrace) -> Self::Slice {}
260}
261
262impl Unslicable for () {
263    type Unsliced = ();
264
265    fn unslice(self) -> Self::Unsliced {}
266}
267
268macro_rules! impl_slicable_for_tuple {
269    ($($T:ident, $T_bt:ident, $idx:tt),+) => {
270        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),+> Slicable<'a, L> for ($($T,)+) {
271            type Slice = ($($T::Slice,)+);
272            type Backtrace = ($($T::Backtrace,)+);
273
274            fn get_location(&self) -> &L {
275                self.0.get_location()
276            }
277
278            #[expect(non_snake_case, reason = "macro codegen")]
279            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace) -> Self::Slice {
280                let ($($T,)+) = self;
281                let ($($T_bt,)+) = backtrace;
282                ($($T.slice(tick, $T_bt),)+)
283            }
284        }
285
286        impl<$($T: Unslicable),+> Unslicable for ($($T,)+) {
287            type Unsliced = ($($T::Unsliced,)+);
288
289            #[expect(non_snake_case, reason = "macro codegen")]
290            fn unslice(self) -> Self::Unsliced {
291                let ($($T,)+) = self;
292                ($($T.unslice(),)+)
293            }
294        }
295    };
296}
297
298#[cfg(stageleft_runtime)]
299impl_slicable_for_tuple!(S1, S1_bt, 0);
300#[cfg(stageleft_runtime)]
301impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
302#[cfg(stageleft_runtime)]
303impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
304#[cfg(stageleft_runtime)]
305impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
306#[cfg(stageleft_runtime)]
307impl_slicable_for_tuple!(
308    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
309);
310#[cfg(stageleft_runtime)]
311impl_slicable_for_tuple!(
312    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5
313);
314#[cfg(stageleft_runtime)]
315impl_slicable_for_tuple!(
316    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
317    6
318);
319#[cfg(stageleft_runtime)]
320impl_slicable_for_tuple!(
321    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
322    6, S8, S8_bt, 7
323);
324#[cfg(stageleft_runtime)]
325impl_slicable_for_tuple!(
326    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
327    6, S8, S8_bt, 7, S9, S9_bt, 8
328);
329#[cfg(stageleft_runtime)]
330impl_slicable_for_tuple!(
331    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
332    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9
333);
334#[cfg(stageleft_runtime)]
335impl_slicable_for_tuple!(
336    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
337    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10
338);
339#[cfg(stageleft_runtime)]
340impl_slicable_for_tuple!(
341    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4, S6, S6_bt, 5, S7, S7_bt,
342    6, S8, S8_bt, 7, S9, S9_bt, 8, S10, S10_bt, 9, S11, S11_bt, 10, S12, S12_bt, 11
343);
344
345macro_rules! impl_cycles_for_tuple {
346    ($($H:ident, $S:ident, $idx:tt),*) => {
347        impl<$($H, $S),*> UnzipCycles for ($(($H, $S),)*) {
348            type Handles = ($($H,)*);
349            type States = ($($S,)*);
350
351            #[expect(clippy::allow_attributes, reason = "macro codegen")]
352            #[allow(non_snake_case, reason = "macro codegen")]
353            fn unzip(self) -> (Self::Handles, Self::States) {
354                let ($($H,)*) = self;
355                (
356                    ($($H.0,)*),
357                    ($($H.1,)*),
358                )
359            }
360        }
361
362        impl<$($H: crate::forward_handle::CompleteCycle<$S>, $S),*> CompleteCycles<($($S,)*)> for ($($H,)*) {
363            #[expect(clippy::allow_attributes, reason = "macro codegen")]
364            #[allow(non_snake_case, reason = "macro codegen")]
365            fn complete(self, states: ($($S,)*)) {
366                let ($($H,)*) = self;
367                let ($($S,)*) = states;
368                $($H.complete_next_tick($S);)*
369            }
370        }
371    };
372}
373
374#[cfg(stageleft_runtime)]
375impl_cycles_for_tuple!();
376#[cfg(stageleft_runtime)]
377impl_cycles_for_tuple!(H1, S1, 0);
378#[cfg(stageleft_runtime)]
379impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1);
380#[cfg(stageleft_runtime)]
381impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2);
382#[cfg(stageleft_runtime)]
383impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3);
384#[cfg(stageleft_runtime)]
385impl_cycles_for_tuple!(H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4);
386#[cfg(stageleft_runtime)]
387impl_cycles_for_tuple!(
388    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5
389);
390#[cfg(stageleft_runtime)]
391impl_cycles_for_tuple!(
392    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6
393);
394#[cfg(stageleft_runtime)]
395impl_cycles_for_tuple!(
396    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7
397);
398#[cfg(stageleft_runtime)]
399impl_cycles_for_tuple!(
400    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
401    8
402);
403#[cfg(stageleft_runtime)]
404impl_cycles_for_tuple!(
405    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
406    8, H10, S10, 9
407);
408#[cfg(stageleft_runtime)]
409impl_cycles_for_tuple!(
410    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
411    8, H10, S10, 9, H11, S11, 10
412);
413#[cfg(stageleft_runtime)]
414impl_cycles_for_tuple!(
415    H1, S1, 0, H2, S2, 1, H3, S3, 2, H4, S4, 3, H5, S5, 4, H6, S6, 5, H7, S7, 6, H8, S8, 7, H9, S9,
416    8, H10, S10, 9, H11, S11, 10, H12, S12, 11
417);
418
419// Unslicable implementations for plain collections (used when returning from sliced! body)
420impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
421    for super::Stream<T, Tick<L>, Bounded, O, R>
422{
423    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
424
425    fn unslice(self) -> Self::Unsliced {
426        self.all_ticks()
427    }
428}
429
430impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
431    type Unsliced = super::Singleton<T, L, Unbounded>;
432
433    fn unslice(self) -> Self::Unsliced {
434        self.latest()
435    }
436}
437
438impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
439    type Unsliced = super::Optional<T, L, Unbounded>;
440
441    fn unslice(self) -> Self::Unsliced {
442        self.latest()
443    }
444}
445
446impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
447    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
448{
449    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
450
451    fn unslice(self) -> Self::Unsliced {
452        self.all_ticks()
453    }
454}
455
456// Unslicable implementations for Atomic-wrapped bounded collections
457impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
458    for style::Atomic<super::Stream<T, Tick<L>, Bounded, O, R>>
459{
460    type Unsliced = super::Stream<T, crate::location::Atomic<L>, Unbounded, O, R>;
461
462    fn unslice(self) -> Self::Unsliced {
463        self.collection.all_ticks_atomic()
464    }
465}
466
467impl<'a, T, L: Location<'a> + NoTick> Unslicable
468    for style::Atomic<super::Singleton<T, Tick<L>, Bounded>>
469{
470    type Unsliced = super::Singleton<T, crate::location::Atomic<L>, Unbounded>;
471
472    fn unslice(self) -> Self::Unsliced {
473        self.collection.latest_atomic()
474    }
475}
476
477impl<'a, T, L: Location<'a> + NoTick> Unslicable
478    for style::Atomic<super::Optional<T, Tick<L>, Bounded>>
479{
480    type Unsliced = super::Optional<T, crate::location::Atomic<L>, Unbounded>;
481
482    fn unslice(self) -> Self::Unsliced {
483        self.collection.latest_atomic()
484    }
485}
486
487impl<'a, K, V, L: Location<'a> + NoTick, O: Ordering, R: Retries> Unslicable
488    for style::Atomic<super::KeyedStream<K, V, Tick<L>, Bounded, O, R>>
489{
490    type Unsliced = super::KeyedStream<K, V, crate::location::Atomic<L>, Unbounded, O, R>;
491
492    fn unslice(self) -> Self::Unsliced {
493        self.collection.all_ticks_atomic()
494    }
495}
496
497#[cfg(feature = "sim")]
498#[cfg(test)]
499mod tests {
500    use stageleft::q;
501
502    use super::sliced;
503    use crate::location::Location;
504    use crate::nondet::nondet;
505    use crate::prelude::FlowBuilder;
506
507    /// Test a counter using `use::state` with an initial singleton value.
508    /// Each input increments the counter, and we verify the output after each tick.
509    #[test]
510    fn sim_state_counter() {
511        let mut flow = FlowBuilder::new();
512        let node = flow.process::<()>();
513
514        let (input_send, input) = node.sim_input::<i32, _, _>();
515
516        let out_recv = sliced! {
517            let batch = use(input, nondet!(/** test */));
518            let mut counter = use::state(|l| l.singleton(q!(0)));
519
520            let new_count = counter.clone().zip(batch.count())
521                .map(q!(|(old, add)| old + add));
522            counter = new_count.clone();
523            new_count.into_stream()
524        }
525        .sim_output();
526
527        flow.sim().exhaustive(async || {
528            input_send.send(1);
529            assert_eq!(out_recv.next().await.unwrap(), 1);
530
531            input_send.send(1);
532            assert_eq!(out_recv.next().await.unwrap(), 2);
533
534            input_send.send(1);
535            assert_eq!(out_recv.next().await.unwrap(), 3);
536        });
537    }
538
539    /// Test `use::state_null` with an Optional that starts as None.
540    #[cfg(feature = "sim")]
541    #[test]
542    fn sim_state_null_optional() {
543        use crate::live_collections::Optional;
544        use crate::live_collections::boundedness::Bounded;
545        use crate::location::{Location, Tick};
546
547        let mut flow = FlowBuilder::new();
548        let node = flow.process::<()>();
549
550        let (input_send, input) = node.sim_input::<i32, _, _>();
551
552        let out_recv = sliced! {
553            let batch = use(input, nondet!(/** test */));
554            let mut prev = use::state_null::<Optional<i32, Tick<_>, Bounded>>();
555
556            // Output the previous value (or -1 if none)
557            let output = prev.clone().unwrap_or(prev.location().singleton(q!(-1)));
558            // Store the current batch's first value for next tick
559            prev = batch.first();
560            output.into_stream()
561        }
562        .sim_output();
563
564        flow.sim().exhaustive(async || {
565            input_send.send(10);
566            // First tick: prev is None, so output is -1
567            assert_eq!(out_recv.next().await.unwrap(), -1);
568
569            input_send.send(20);
570            // Second tick: prev is Some(10), so output is 10
571            assert_eq!(out_recv.next().await.unwrap(), 10);
572
573            input_send.send(30);
574            // Third tick: prev is Some(20), so output is 20
575            assert_eq!(out_recv.next().await.unwrap(), 20);
576        });
577    }
578
579    /// Test `use::state` with `source_iter` to initialize a stream state.
580    /// On the first tick, the state is the initial `[10, 20]` from `source_iter`.
581    /// On subsequent ticks, the state is the batch from the previous tick.
582    #[test]
583    fn sim_state_source_iter() {
584        let mut flow = FlowBuilder::new();
585        let node = flow.process::<()>();
586
587        let (input_send, input) = node.sim_input::<i32, _, _>();
588
589        let out_recv = sliced! {
590            let batch = use(input, nondet!(/** test */));
591            let mut items = use::state(|l| l.source_iter(q!([10, 20])));
592
593            // Output the current state, then replace it with the batch
594            let output = items.clone();
595            items = batch;
596            output
597        }
598        .sim_output();
599
600        flow.sim().exhaustive(async || {
601            input_send.send(3);
602            // First tick: items = initial [10, 20], output = [10, 20]
603            let mut results = vec![];
604            results.push(out_recv.next().await.unwrap());
605            results.push(out_recv.next().await.unwrap());
606            results.sort();
607            assert_eq!(results, vec![10, 20]);
608
609            input_send.send(4);
610            // Second tick: items = [3] (from previous batch), output = [3]
611            assert_eq!(out_recv.next().await.unwrap(), 3);
612
613            input_send.send(5);
614            // Third tick: items = [4] (from previous batch), output = [4]
615            assert_eq!(out_recv.next().await.unwrap(), 4);
616        });
617    }
618
619    /// Test atomic slicing with keyed streams.
620    #[test]
621    fn sim_sliced_atomic_keyed_stream() {
622        let mut flow = FlowBuilder::new();
623        let node = flow.process::<()>();
624
625        let (input_send, input) = node.sim_input::<(i32, i32), _, _>();
626        let atomic_keyed_input = input.into_keyed().atomic();
627        let accumulated_inputs = atomic_keyed_input
628            .clone()
629            .assume_ordering(nondet!(/** Test */))
630            .fold(
631                q!(|| 0),
632                q!(|curr, new| {
633                    *curr += new;
634                }),
635            );
636
637        let out_recv = sliced! {
638            let atomic_keyed_input = use::atomic(atomic_keyed_input, nondet!(/** test */));
639            let accumulated_inputs = use::atomic(accumulated_inputs, nondet!(/** test */));
640            accumulated_inputs.join_keyed_stream(atomic_keyed_input)
641                .map(q!(|(sum, _input)| sum))
642                .entries()
643        }
644        .assume_ordering_trusted(nondet!(/** test */))
645        .sim_output();
646
647        flow.sim().exhaustive(async || {
648            input_send.send((1, 1));
649            assert_eq!(out_recv.next().await.unwrap(), (1, 1));
650
651            input_send.send((1, 2));
652            assert_eq!(out_recv.next().await.unwrap(), (1, 3));
653
654            input_send.send((2, 1));
655            assert_eq!(out_recv.next().await.unwrap(), (2, 1));
656
657            input_send.send((1, 3));
658            assert_eq!(out_recv.next().await.unwrap(), (1, 6));
659        });
660    }
661}