hydro_lang/live_collections/stream/mod.rs
1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26#[cfg(stageleft_runtime)]
27use crate::location::dynamic::{DynLocation, LocationId};
28use crate::location::tick::{Atomic, DeferTick, NoAtomic};
29use crate::location::{Location, NoTick, Tick, check_matching_location};
30use crate::manual_expr::ManualExpr;
31use crate::nondet::{NonDet, nondet};
32use crate::prelude::manual_proof;
33use crate::properties::{AggFuncAlgebra, ValidCommutativityFor, ValidIdempotenceFor};
34
35pub mod networking;
36
37/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
38#[sealed::sealed]
39pub trait Ordering:
40 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
41{
42 /// The [`StreamOrder`] corresponding to this type.
43 const ORDERING_KIND: StreamOrder;
44}
45
46/// Marks the stream as being totally ordered, which means that there are
47/// no sources of non-determinism (other than intentional ones) that will
48/// affect the order of elements.
49pub enum TotalOrder {}
50
51#[sealed::sealed]
52impl Ordering for TotalOrder {
53 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
54}
55
56/// Marks the stream as having no order, which means that the order of
57/// elements may be affected by non-determinism.
58///
59/// This restricts certain operators, such as `fold` and `reduce`, to only
60/// be used with commutative aggregation functions.
61pub enum NoOrder {}
62
63#[sealed::sealed]
64impl Ordering for NoOrder {
65 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
66}
67
68/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
69/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
70/// have `Self` guarantees instead.
71#[sealed::sealed]
72pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
73#[sealed::sealed]
74impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
75
76/// Helper trait for determining the weakest of two orderings.
77#[sealed::sealed]
78pub trait MinOrder<Other: ?Sized> {
79 /// The weaker of the two orderings.
80 type Min: Ordering;
81}
82
83#[sealed::sealed]
84impl<O: Ordering> MinOrder<O> for TotalOrder {
85 type Min = O;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for NoOrder {
90 type Min = NoOrder;
91}
92
93/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
94#[sealed::sealed]
95pub trait Retries:
96 MinRetries<Self, Min = Self>
97 + MinRetries<ExactlyOnce, Min = Self>
98 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
99{
100 /// The [`StreamRetry`] corresponding to this type.
101 const RETRIES_KIND: StreamRetry;
102}
103
104/// Marks the stream as having deterministic message cardinality, with no
105/// possibility of duplicates.
106pub enum ExactlyOnce {}
107
108#[sealed::sealed]
109impl Retries for ExactlyOnce {
110 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
111}
112
113/// Marks the stream as having non-deterministic message cardinality, which
114/// means that duplicates may occur, but messages will not be dropped.
115pub enum AtLeastOnce {}
116
117#[sealed::sealed]
118impl Retries for AtLeastOnce {
119 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
120}
121
122/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
123/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
124/// have `Self` guarantees instead.
125#[sealed::sealed]
126pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
127#[sealed::sealed]
128impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
129
130/// Helper trait for determining the weakest of two retry guarantees.
131#[sealed::sealed]
132pub trait MinRetries<Other: ?Sized> {
133 /// The weaker of the two retry guarantees.
134 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
135}
136
137#[sealed::sealed]
138impl<R: Retries> MinRetries<R> for ExactlyOnce {
139 type Min = R;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for AtLeastOnce {
144 type Min = AtLeastOnce;
145}
146
147#[sealed::sealed]
148#[diagnostic::on_unimplemented(
149 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
150 label = "required here",
151 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
152)]
153/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
154pub trait IsOrdered: Ordering {}
155
156#[sealed::sealed]
157#[diagnostic::do_not_recommend]
158impl IsOrdered for TotalOrder {}
159
160#[sealed::sealed]
161#[diagnostic::on_unimplemented(
162 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
163 label = "required here",
164 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
165)]
166/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
167pub trait IsExactlyOnce: Retries {}
168
169#[sealed::sealed]
170#[diagnostic::do_not_recommend]
171impl IsExactlyOnce for ExactlyOnce {}
172
173/// Streaming sequence of elements with type `Type`.
174///
175/// This live collection represents a growing sequence of elements, with new elements being
176/// asynchronously appended to the end of the sequence. This can be used to model the arrival
177/// of network input, such as API requests, or streaming ingestion.
178///
179/// By default, all streams have deterministic ordering and each element is materialized exactly
180/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
181/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
182/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
183///
184/// Type Parameters:
185/// - `Type`: the type of elements in the stream
186/// - `Loc`: the location where the stream is being materialized
187/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
188/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
189/// (default is [`TotalOrder`])
190/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
191/// [`AtLeastOnce`] (default is [`ExactlyOnce`])
192pub struct Stream<
193 Type,
194 Loc,
195 Bound: Boundedness = Unbounded,
196 Order: Ordering = TotalOrder,
197 Retry: Retries = ExactlyOnce,
198> {
199 pub(crate) location: Loc,
200 pub(crate) ir_node: RefCell<HydroNode>,
201 pub(crate) flow_state: FlowState,
202
203 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
204}
205
206impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
207 fn drop(&mut self) {
208 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
209 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
210 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
211 input: Box::new(ir_node),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215 }
216}
217
218impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
219 for Stream<T, L, Unbounded, O, R>
220where
221 L: Location<'a>,
222{
223 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
224 let new_meta = stream
225 .location
226 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
227
228 Stream {
229 location: stream.location.clone(),
230 flow_state: stream.flow_state.clone(),
231 ir_node: RefCell::new(HydroNode::Cast {
232 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
233 metadata: new_meta,
234 }),
235 _phantom: PhantomData,
236 }
237 }
238}
239
240impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
241 for Stream<T, L, B, NoOrder, R>
242where
243 L: Location<'a>,
244{
245 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
246 stream.weaken_ordering()
247 }
248}
249
250impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
251 for Stream<T, L, B, O, AtLeastOnce>
252where
253 L: Location<'a>,
254{
255 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
256 stream.weaken_retries()
257 }
258}
259
260impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
261where
262 L: Location<'a>,
263{
264 fn defer_tick(self) -> Self {
265 Stream::defer_tick(self)
266 }
267}
268
269impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
270 for Stream<T, Tick<L>, Bounded, O, R>
271where
272 L: Location<'a>,
273{
274 type Location = Tick<L>;
275
276 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
277 Stream::new(
278 location.clone(),
279 HydroNode::CycleSource {
280 cycle_id,
281 metadata: location.new_node_metadata(Self::collection_kind()),
282 },
283 )
284 }
285}
286
287impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
288 for Stream<T, Tick<L>, Bounded, O, R>
289where
290 L: Location<'a>,
291{
292 type Location = Tick<L>;
293
294 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
295 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
296 location.clone(),
297 HydroNode::DeferTick {
298 input: Box::new(HydroNode::CycleSource {
299 cycle_id,
300 metadata: location.new_node_metadata(Self::collection_kind()),
301 }),
302 metadata: location.new_node_metadata(Self::collection_kind()),
303 },
304 );
305
306 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
307 }
308}
309
310impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
311 for Stream<T, Tick<L>, Bounded, O, R>
312where
313 L: Location<'a>,
314{
315 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
316 assert_eq!(
317 Location::id(&self.location),
318 expected_location,
319 "locations do not match"
320 );
321 self.location
322 .flow_state()
323 .borrow_mut()
324 .push_root(HydroRoot::CycleSink {
325 cycle_id,
326 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
327 op_metadata: HydroIrOpMetadata::new(),
328 });
329 }
330}
331
332impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
333 for Stream<T, L, B, O, R>
334where
335 L: Location<'a> + NoTick,
336{
337 type Location = L;
338
339 fn create_source(cycle_id: CycleId, location: L) -> Self {
340 Stream::new(
341 location.clone(),
342 HydroNode::CycleSource {
343 cycle_id,
344 metadata: location.new_node_metadata(Self::collection_kind()),
345 },
346 )
347 }
348}
349
350impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
351 for Stream<T, L, B, O, R>
352where
353 L: Location<'a> + NoTick,
354{
355 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
356 assert_eq!(
357 Location::id(&self.location),
358 expected_location,
359 "locations do not match"
360 );
361 self.location
362 .flow_state()
363 .borrow_mut()
364 .push_root(HydroRoot::CycleSink {
365 cycle_id,
366 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
367 op_metadata: HydroIrOpMetadata::new(),
368 });
369 }
370}
371
372impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
373where
374 T: Clone,
375 L: Location<'a>,
376{
377 fn clone(&self) -> Self {
378 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
379 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
380 *self.ir_node.borrow_mut() = HydroNode::Tee {
381 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
382 metadata: self.location.new_node_metadata(Self::collection_kind()),
383 };
384 }
385
386 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
387 Stream {
388 location: self.location.clone(),
389 flow_state: self.flow_state.clone(),
390 ir_node: HydroNode::Tee {
391 inner: SharedNode(inner.0.clone()),
392 metadata: metadata.clone(),
393 }
394 .into(),
395 _phantom: PhantomData,
396 }
397 } else {
398 unreachable!()
399 }
400 }
401}
402
403impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
404where
405 L: Location<'a>,
406{
407 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
408 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
409 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
410
411 let flow_state = location.flow_state().clone();
412 Stream {
413 location,
414 flow_state,
415 ir_node: RefCell::new(ir_node),
416 _phantom: PhantomData,
417 }
418 }
419
420 /// Returns the [`Location`] where this stream is being materialized.
421 pub fn location(&self) -> &L {
422 &self.location
423 }
424
425 pub(crate) fn collection_kind() -> CollectionKind {
426 CollectionKind::Stream {
427 bound: B::BOUND_KIND,
428 order: O::ORDERING_KIND,
429 retry: R::RETRIES_KIND,
430 element_type: quote_type::<T>().into(),
431 }
432 }
433
434 /// Produces a stream based on invoking `f` on each element.
435 /// If you do not want to modify the stream and instead only want to view
436 /// each item use [`Stream::inspect`] instead.
437 ///
438 /// # Example
439 /// ```rust
440 /// # #[cfg(feature = "deploy")] {
441 /// # use hydro_lang::prelude::*;
442 /// # use futures::StreamExt;
443 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
444 /// let words = process.source_iter(q!(vec!["hello", "world"]));
445 /// words.map(q!(|x| x.to_uppercase()))
446 /// # }, |mut stream| async move {
447 /// # for w in vec!["HELLO", "WORLD"] {
448 /// # assert_eq!(stream.next().await.unwrap(), w);
449 /// # }
450 /// # }));
451 /// # }
452 /// ```
453 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
454 where
455 F: Fn(T) -> U + 'a,
456 {
457 let f = f.splice_fn1_ctx(&self.location).into();
458 Stream::new(
459 self.location.clone(),
460 HydroNode::Map {
461 f,
462 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
463 metadata: self
464 .location
465 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
466 },
467 )
468 }
469
470 /// For each item `i` in the input stream, transform `i` using `f` and then treat the
471 /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
472 /// for the output type `U` must produce items in a **deterministic** order.
473 ///
474 /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
475 /// not deterministic, use [`Stream::flat_map_unordered`] instead.
476 ///
477 /// # Example
478 /// ```rust
479 /// # #[cfg(feature = "deploy")] {
480 /// # use hydro_lang::prelude::*;
481 /// # use futures::StreamExt;
482 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
483 /// process
484 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
485 /// .flat_map_ordered(q!(|x| x))
486 /// # }, |mut stream| async move {
487 /// // 1, 2, 3, 4
488 /// # for w in (1..5) {
489 /// # assert_eq!(stream.next().await.unwrap(), w);
490 /// # }
491 /// # }));
492 /// # }
493 /// ```
494 pub fn flat_map_ordered<U, I, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
495 where
496 I: IntoIterator<Item = U>,
497 F: Fn(T) -> I + 'a,
498 {
499 let f = f.splice_fn1_ctx(&self.location).into();
500 Stream::new(
501 self.location.clone(),
502 HydroNode::FlatMap {
503 f,
504 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505 metadata: self
506 .location
507 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
508 },
509 )
510 }
511
512 /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
513 /// for the output type `U` to produce items in any order.
514 ///
515 /// # Example
516 /// ```rust
517 /// # #[cfg(feature = "deploy")] {
518 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
519 /// # use futures::StreamExt;
520 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
521 /// process
522 /// .source_iter(q!(vec![
523 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
524 /// std::collections::HashSet::from_iter(vec![3, 4]),
525 /// ]))
526 /// .flat_map_unordered(q!(|x| x))
527 /// # }, |mut stream| async move {
528 /// // 1, 2, 3, 4, but in no particular order
529 /// # let mut results = Vec::new();
530 /// # for w in (1..5) {
531 /// # results.push(stream.next().await.unwrap());
532 /// # }
533 /// # results.sort();
534 /// # assert_eq!(results, vec![1, 2, 3, 4]);
535 /// # }));
536 /// # }
537 /// ```
538 pub fn flat_map_unordered<U, I, F>(
539 self,
540 f: impl IntoQuotedMut<'a, F, L>,
541 ) -> Stream<U, L, B, NoOrder, R>
542 where
543 I: IntoIterator<Item = U>,
544 F: Fn(T) -> I + 'a,
545 {
546 let f = f.splice_fn1_ctx(&self.location).into();
547 Stream::new(
548 self.location.clone(),
549 HydroNode::FlatMap {
550 f,
551 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
552 metadata: self
553 .location
554 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
555 },
556 )
557 }
558
559 /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
560 /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
561 ///
562 /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
563 /// not deterministic, use [`Stream::flatten_unordered`] instead.
564 ///
565 /// ```rust
566 /// # #[cfg(feature = "deploy")] {
567 /// # use hydro_lang::prelude::*;
568 /// # use futures::StreamExt;
569 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
570 /// process
571 /// .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
572 /// .flatten_ordered()
573 /// # }, |mut stream| async move {
574 /// // 1, 2, 3, 4
575 /// # for w in (1..5) {
576 /// # assert_eq!(stream.next().await.unwrap(), w);
577 /// # }
578 /// # }));
579 /// # }
580 /// ```
581 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
582 where
583 T: IntoIterator<Item = U>,
584 {
585 self.flat_map_ordered(q!(|d| d))
586 }
587
588 /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
589 /// for the element type `T` to produce items in any order.
590 ///
591 /// # Example
592 /// ```rust
593 /// # #[cfg(feature = "deploy")] {
594 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
595 /// # use futures::StreamExt;
596 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
597 /// process
598 /// .source_iter(q!(vec![
599 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
600 /// std::collections::HashSet::from_iter(vec![3, 4]),
601 /// ]))
602 /// .flatten_unordered()
603 /// # }, |mut stream| async move {
604 /// // 1, 2, 3, 4, but in no particular order
605 /// # let mut results = Vec::new();
606 /// # for w in (1..5) {
607 /// # results.push(stream.next().await.unwrap());
608 /// # }
609 /// # results.sort();
610 /// # assert_eq!(results, vec![1, 2, 3, 4]);
611 /// # }));
612 /// # }
613 /// ```
614 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
615 where
616 T: IntoIterator<Item = U>,
617 {
618 self.flat_map_unordered(q!(|d| d))
619 }
620
621 /// Creates a stream containing only the elements of the input stream that satisfy a predicate
622 /// `f`, preserving the order of the elements.
623 ///
624 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
625 /// not modify or take ownership of the values. If you need to modify the values while filtering
626 /// use [`Stream::filter_map`] instead.
627 ///
628 /// # Example
629 /// ```rust
630 /// # #[cfg(feature = "deploy")] {
631 /// # use hydro_lang::prelude::*;
632 /// # use futures::StreamExt;
633 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
634 /// process
635 /// .source_iter(q!(vec![1, 2, 3, 4]))
636 /// .filter(q!(|&x| x > 2))
637 /// # }, |mut stream| async move {
638 /// // 3, 4
639 /// # for w in (3..5) {
640 /// # assert_eq!(stream.next().await.unwrap(), w);
641 /// # }
642 /// # }));
643 /// # }
644 /// ```
645 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
646 where
647 F: Fn(&T) -> bool + 'a,
648 {
649 let f = f.splice_fn1_borrow_ctx(&self.location).into();
650 Stream::new(
651 self.location.clone(),
652 HydroNode::Filter {
653 f,
654 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
655 metadata: self.location.new_node_metadata(Self::collection_kind()),
656 },
657 )
658 }
659
660 /// Splits the stream into two streams based on a predicate, without cloning elements.
661 ///
662 /// Elements for which `f` returns `true` are sent to the first output stream,
663 /// and elements for which `f` returns `false` are sent to the second output stream.
664 ///
665 /// Unlike using `filter` twice, this only evaluates the predicate once per element
666 /// and does not require `T: Clone`.
667 ///
668 /// The closure `f` receives a reference `&T` rather than an owned value `T` because
669 /// the predicate is only used for routing; the element itself is moved to the
670 /// appropriate output stream.
671 ///
672 /// # Example
673 /// ```rust
674 /// # #[cfg(feature = "deploy")] {
675 /// # use hydro_lang::prelude::*;
676 /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
677 /// # use futures::StreamExt;
678 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
679 /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
680 /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
681 /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
682 /// evens.map(q!(|x| (x, true)))
683 /// .merge_unordered(odds.map(q!(|x| (x, false))))
684 /// # }, |mut stream| async move {
685 /// # let mut results = Vec::new();
686 /// # for _ in 0..6 {
687 /// # results.push(stream.next().await.unwrap());
688 /// # }
689 /// # results.sort();
690 /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
691 /// # }));
692 /// # }
693 /// ```
694 #[expect(
695 clippy::type_complexity,
696 reason = "return type mirrors the input stream type"
697 )]
698 pub fn partition<F>(
699 self,
700 f: impl IntoQuotedMut<'a, F, L>,
701 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
702 where
703 F: Fn(&T) -> bool + 'a,
704 {
705 let f: crate::compile::ir::DebugExpr = f.splice_fn1_borrow_ctx(&self.location).into();
706 let shared = SharedNode(Rc::new(RefCell::new(
707 self.ir_node.replace(HydroNode::Placeholder),
708 )));
709
710 let true_stream = Stream::new(
711 self.location.clone(),
712 HydroNode::Partition {
713 inner: SharedNode(shared.0.clone()),
714 f: f.clone(),
715 is_true: true,
716 metadata: self.location.new_node_metadata(Self::collection_kind()),
717 },
718 );
719
720 let false_stream = Stream::new(
721 self.location.clone(),
722 HydroNode::Partition {
723 inner: SharedNode(shared.0),
724 f,
725 is_true: false,
726 metadata: self.location.new_node_metadata(Self::collection_kind()),
727 },
728 );
729
730 (true_stream, false_stream)
731 }
732
733 /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
734 ///
735 /// # Example
736 /// ```rust
737 /// # #[cfg(feature = "deploy")] {
738 /// # use hydro_lang::prelude::*;
739 /// # use futures::StreamExt;
740 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
741 /// process
742 /// .source_iter(q!(vec!["1", "hello", "world", "2"]))
743 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
744 /// # }, |mut stream| async move {
745 /// // 1, 2
746 /// # for w in (1..3) {
747 /// # assert_eq!(stream.next().await.unwrap(), w);
748 /// # }
749 /// # }));
750 /// # }
751 /// ```
752 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Stream<U, L, B, O, R>
753 where
754 F: Fn(T) -> Option<U> + 'a,
755 {
756 let f = f.splice_fn1_ctx(&self.location).into();
757 Stream::new(
758 self.location.clone(),
759 HydroNode::FilterMap {
760 f,
761 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
762 metadata: self
763 .location
764 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
765 },
766 )
767 }
768
769 /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
770 /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
771 /// If `other` is an empty [`Optional`], no values will be produced.
772 ///
773 /// # Example
774 /// ```rust
775 /// # #[cfg(feature = "deploy")] {
776 /// # use hydro_lang::prelude::*;
777 /// # use futures::StreamExt;
778 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
779 /// let tick = process.tick();
780 /// let batch = process
781 /// .source_iter(q!(vec![1, 2, 3, 4]))
782 /// .batch(&tick, nondet!(/** test */));
783 /// let count = batch.clone().count(); // `count()` returns a singleton
784 /// batch.cross_singleton(count).all_ticks()
785 /// # }, |mut stream| async move {
786 /// // (1, 4), (2, 4), (3, 4), (4, 4)
787 /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
788 /// # assert_eq!(stream.next().await.unwrap(), w);
789 /// # }
790 /// # }));
791 /// # }
792 /// ```
793 pub fn cross_singleton<O2>(
794 self,
795 other: impl Into<Optional<O2, L, Bounded>>,
796 ) -> Stream<(T, O2), L, B, O, R>
797 where
798 O2: Clone,
799 {
800 let other: Optional<O2, L, Bounded> = other.into();
801 check_matching_location(&self.location, &other.location);
802
803 Stream::new(
804 self.location.clone(),
805 HydroNode::CrossSingleton {
806 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
807 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
808 metadata: self
809 .location
810 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
811 },
812 )
813 }
814
815 /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
816 ///
817 /// # Example
818 /// ```rust
819 /// # #[cfg(feature = "deploy")] {
820 /// # use hydro_lang::prelude::*;
821 /// # use futures::StreamExt;
822 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
823 /// let tick = process.tick();
824 /// // ticks are lazy by default, forces the second tick to run
825 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
826 ///
827 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
828 /// let batch_first_tick = process
829 /// .source_iter(q!(vec![1, 2, 3, 4]))
830 /// .batch(&tick, nondet!(/** test */));
831 /// let batch_second_tick = process
832 /// .source_iter(q!(vec![5, 6, 7, 8]))
833 /// .batch(&tick, nondet!(/** test */))
834 /// .defer_tick();
835 /// batch_first_tick.chain(batch_second_tick)
836 /// .filter_if(signal)
837 /// .all_ticks()
838 /// # }, |mut stream| async move {
839 /// // [1, 2, 3, 4]
840 /// # for w in vec![1, 2, 3, 4] {
841 /// # assert_eq!(stream.next().await.unwrap(), w);
842 /// # }
843 /// # }));
844 /// # }
845 /// ```
846 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
847 self.cross_singleton(signal.filter(q!(|b| *b)))
848 .map(q!(|(d, _)| d))
849 }
850
851 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
852 ///
853 /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
854 /// leader of a cluster.
855 ///
856 /// # Example
857 /// ```rust
858 /// # #[cfg(feature = "deploy")] {
859 /// # use hydro_lang::prelude::*;
860 /// # use futures::StreamExt;
861 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
862 /// let tick = process.tick();
863 /// // ticks are lazy by default, forces the second tick to run
864 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
865 ///
866 /// let batch_first_tick = process
867 /// .source_iter(q!(vec![1, 2, 3, 4]))
868 /// .batch(&tick, nondet!(/** test */));
869 /// let batch_second_tick = process
870 /// .source_iter(q!(vec![5, 6, 7, 8]))
871 /// .batch(&tick, nondet!(/** test */))
872 /// .defer_tick(); // appears on the second tick
873 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
874 /// batch_first_tick.chain(batch_second_tick)
875 /// .filter_if_some(some_on_first_tick)
876 /// .all_ticks()
877 /// # }, |mut stream| async move {
878 /// // [1, 2, 3, 4]
879 /// # for w in vec![1, 2, 3, 4] {
880 /// # assert_eq!(stream.next().await.unwrap(), w);
881 /// # }
882 /// # }));
883 /// # }
884 /// ```
885 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
886 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
887 self.filter_if(signal.is_some())
888 }
889
890 /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
891 ///
892 /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
893 /// some local state.
894 ///
895 /// # Example
896 /// ```rust
897 /// # #[cfg(feature = "deploy")] {
898 /// # use hydro_lang::prelude::*;
899 /// # use futures::StreamExt;
900 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
901 /// let tick = process.tick();
902 /// // ticks are lazy by default, forces the second tick to run
903 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
904 ///
905 /// let batch_first_tick = process
906 /// .source_iter(q!(vec![1, 2, 3, 4]))
907 /// .batch(&tick, nondet!(/** test */));
908 /// let batch_second_tick = process
909 /// .source_iter(q!(vec![5, 6, 7, 8]))
910 /// .batch(&tick, nondet!(/** test */))
911 /// .defer_tick(); // appears on the second tick
912 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
913 /// batch_first_tick.chain(batch_second_tick)
914 /// .filter_if_none(some_on_first_tick)
915 /// .all_ticks()
916 /// # }, |mut stream| async move {
917 /// // [5, 6, 7, 8]
918 /// # for w in vec![5, 6, 7, 8] {
919 /// # assert_eq!(stream.next().await.unwrap(), w);
920 /// # }
921 /// # }));
922 /// # }
923 /// ```
924 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
925 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
926 self.filter_if(other.is_none())
927 }
928
929 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams, returning all
930 /// tupled pairs in a non-deterministic order.
931 ///
932 /// # Example
933 /// ```rust
934 /// # #[cfg(feature = "deploy")] {
935 /// # use hydro_lang::prelude::*;
936 /// # use std::collections::HashSet;
937 /// # use futures::StreamExt;
938 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
939 /// let tick = process.tick();
940 /// let stream1 = process.source_iter(q!(vec!['a', 'b', 'c']));
941 /// let stream2 = process.source_iter(q!(vec![1, 2, 3]));
942 /// stream1.cross_product(stream2)
943 /// # }, |mut stream| async move {
944 /// # let expected = HashSet::from([('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 2), ('c', 2), ('a', 3), ('b', 3), ('c', 3)]);
945 /// # stream.map(|i| assert!(expected.contains(&i)));
946 /// # }));
947 /// # }
948 /// ```
949 pub fn cross_product<T2, O2: Ordering>(
950 self,
951 other: Stream<T2, L, B, O2, R>,
952 ) -> Stream<(T, T2), L, B, NoOrder, R>
953 where
954 T: Clone,
955 T2: Clone,
956 {
957 check_matching_location(&self.location, &other.location);
958
959 Stream::new(
960 self.location.clone(),
961 HydroNode::CrossProduct {
962 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
963 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
964 metadata: self
965 .location
966 .new_node_metadata(Stream::<(T, T2), L, B, NoOrder, R>::collection_kind()),
967 },
968 )
969 }
970
971 /// Takes one stream as input and filters out any duplicate occurrences. The output
972 /// contains all unique values from the input.
973 ///
974 /// # Example
975 /// ```rust
976 /// # #[cfg(feature = "deploy")] {
977 /// # use hydro_lang::prelude::*;
978 /// # use futures::StreamExt;
979 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
980 /// let tick = process.tick();
981 /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
982 /// # }, |mut stream| async move {
983 /// # for w in vec![1, 2, 3, 4] {
984 /// # assert_eq!(stream.next().await.unwrap(), w);
985 /// # }
986 /// # }));
987 /// # }
988 /// ```
989 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
990 where
991 T: Eq + Hash,
992 {
993 Stream::new(
994 self.location.clone(),
995 HydroNode::Unique {
996 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
997 metadata: self
998 .location
999 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1000 },
1001 )
1002 }
1003
1004 /// Outputs everything in this stream that is *not* contained in the `other` stream.
1005 ///
1006 /// The `other` stream must be [`Bounded`], since this function will wait until
1007 /// all its elements are available before producing any output.
1008 /// # Example
1009 /// ```rust
1010 /// # #[cfg(feature = "deploy")] {
1011 /// # use hydro_lang::prelude::*;
1012 /// # use futures::StreamExt;
1013 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1014 /// let tick = process.tick();
1015 /// let stream = process
1016 /// .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1017 /// .batch(&tick, nondet!(/** test */));
1018 /// let batch = process
1019 /// .source_iter(q!(vec![1, 2]))
1020 /// .batch(&tick, nondet!(/** test */));
1021 /// stream.filter_not_in(batch).all_ticks()
1022 /// # }, |mut stream| async move {
1023 /// # for w in vec![3, 4] {
1024 /// # assert_eq!(stream.next().await.unwrap(), w);
1025 /// # }
1026 /// # }));
1027 /// # }
1028 /// ```
1029 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1030 where
1031 T: Eq + Hash,
1032 B2: IsBounded,
1033 {
1034 check_matching_location(&self.location, &other.location);
1035
1036 Stream::new(
1037 self.location.clone(),
1038 HydroNode::Difference {
1039 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1040 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1041 metadata: self
1042 .location
1043 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1044 },
1045 )
1046 }
1047
1048 /// An operator which allows you to "inspect" each element of a stream without
1049 /// modifying it. The closure `f` is called on a reference to each item. This is
1050 /// mainly useful for debugging, and should not be used to generate side-effects.
1051 ///
1052 /// # Example
1053 /// ```rust
1054 /// # #[cfg(feature = "deploy")] {
1055 /// # use hydro_lang::prelude::*;
1056 /// # use futures::StreamExt;
1057 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1058 /// let nums = process.source_iter(q!(vec![1, 2]));
1059 /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1060 /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1061 /// # }, |mut stream| async move {
1062 /// # for w in vec![1, 2] {
1063 /// # assert_eq!(stream.next().await.unwrap(), w);
1064 /// # }
1065 /// # }));
1066 /// # }
1067 /// ```
1068 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1069 where
1070 F: Fn(&T) + 'a,
1071 {
1072 let f = f.splice_fn1_borrow_ctx(&self.location).into();
1073
1074 Stream::new(
1075 self.location.clone(),
1076 HydroNode::Inspect {
1077 f,
1078 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079 metadata: self.location.new_node_metadata(Self::collection_kind()),
1080 },
1081 )
1082 }
1083
1084 /// Executes the provided closure for every element in this stream.
1085 ///
1086 /// Because the closure may have side effects, the stream must have deterministic order
1087 /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1088 /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1089 /// [`Stream::assume_retries`] with an explanation for why this is the case.
1090 pub fn for_each<F: Fn(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1091 where
1092 O: IsOrdered,
1093 R: IsExactlyOnce,
1094 {
1095 let f = f.splice_fn1_ctx(&self.location).into();
1096 self.location
1097 .flow_state()
1098 .borrow_mut()
1099 .push_root(HydroRoot::ForEach {
1100 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1101 f,
1102 op_metadata: HydroIrOpMetadata::new(),
1103 });
1104 }
1105
1106 /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1107 /// TCP socket to some other server. You should _not_ use this API for interacting with
1108 /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1109 /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1110 /// interaction with asynchronous sinks.
1111 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1112 where
1113 O: IsOrdered,
1114 R: IsExactlyOnce,
1115 S: 'a + futures::Sink<T> + Unpin,
1116 {
1117 self.location
1118 .flow_state()
1119 .borrow_mut()
1120 .push_root(HydroRoot::DestSink {
1121 sink: sink.splice_typed_ctx(&self.location).into(),
1122 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1123 op_metadata: HydroIrOpMetadata::new(),
1124 });
1125 }
1126
1127 /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1128 ///
1129 /// # Example
1130 /// ```rust
1131 /// # #[cfg(feature = "deploy")] {
1132 /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1133 /// # use futures::StreamExt;
1134 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1135 /// let tick = process.tick();
1136 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1137 /// numbers.enumerate()
1138 /// # }, |mut stream| async move {
1139 /// // (0, 1), (1, 2), (2, 3), (3, 4)
1140 /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1141 /// # assert_eq!(stream.next().await.unwrap(), w);
1142 /// # }
1143 /// # }));
1144 /// # }
1145 /// ```
1146 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1147 where
1148 O: IsOrdered,
1149 R: IsExactlyOnce,
1150 {
1151 Stream::new(
1152 self.location.clone(),
1153 HydroNode::Enumerate {
1154 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1155 metadata: self.location.new_node_metadata(Stream::<
1156 (usize, T),
1157 L,
1158 B,
1159 TotalOrder,
1160 ExactlyOnce,
1161 >::collection_kind()),
1162 },
1163 )
1164 }
1165
1166 /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1167 /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1168 /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1169 ///
1170 /// Depending on the input stream guarantees, the closure may need to be commutative
1171 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1172 ///
1173 /// # Example
1174 /// ```rust
1175 /// # #[cfg(feature = "deploy")] {
1176 /// # use hydro_lang::prelude::*;
1177 /// # use futures::StreamExt;
1178 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1179 /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1180 /// words
1181 /// .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1182 /// .into_stream()
1183 /// # }, |mut stream| async move {
1184 /// // "HELLOWORLD"
1185 /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1186 /// # }));
1187 /// # }
1188 /// ```
1189 pub fn fold<A, I, F, C, Idemp>(
1190 self,
1191 init: impl IntoQuotedMut<'a, I, L>,
1192 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1193 ) -> Singleton<A, L, B>
1194 where
1195 I: Fn() -> A + 'a,
1196 F: Fn(&mut A, T),
1197 C: ValidCommutativityFor<O>,
1198 Idemp: ValidIdempotenceFor<R>,
1199 {
1200 let init = init.splice_fn0_ctx(&self.location).into();
1201 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1202 proof.register_proof(&comb);
1203
1204 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1205 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1206
1207 let core = HydroNode::Fold {
1208 init,
1209 acc: comb.into(),
1210 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1211 metadata: ordered_etc
1212 .location
1213 .new_node_metadata(Singleton::<A, L, B>::collection_kind()),
1214 };
1215
1216 Singleton::new(ordered_etc.location.clone(), core)
1217 }
1218
1219 /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1220 /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1221 /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1222 /// reference, so that it can be modified in place.
1223 ///
1224 /// Depending on the input stream guarantees, the closure may need to be commutative
1225 /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1226 ///
1227 /// # Example
1228 /// ```rust
1229 /// # #[cfg(feature = "deploy")] {
1230 /// # use hydro_lang::prelude::*;
1231 /// # use futures::StreamExt;
1232 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1233 /// let bools = process.source_iter(q!(vec![false, true, false]));
1234 /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1235 /// # }, |mut stream| async move {
1236 /// // true
1237 /// # assert_eq!(stream.next().await.unwrap(), true);
1238 /// # }));
1239 /// # }
1240 /// ```
1241 pub fn reduce<F, C, Idemp>(
1242 self,
1243 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1244 ) -> Optional<T, L, B>
1245 where
1246 F: Fn(&mut T, T) + 'a,
1247 C: ValidCommutativityFor<O>,
1248 Idemp: ValidIdempotenceFor<R>,
1249 {
1250 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1251 proof.register_proof(&f);
1252
1253 let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1254 let ordered_etc: Stream<T, L, B> = self.assume_retries(nondet).assume_ordering(nondet);
1255
1256 let core = HydroNode::Reduce {
1257 f: f.into(),
1258 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1259 metadata: ordered_etc
1260 .location
1261 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1262 };
1263
1264 Optional::new(ordered_etc.location.clone(), core)
1265 }
1266
1267 /// Computes the maximum element in the stream as an [`Optional`], which
1268 /// will be empty until the first element in the input arrives.
1269 ///
1270 /// # Example
1271 /// ```rust
1272 /// # #[cfg(feature = "deploy")] {
1273 /// # use hydro_lang::prelude::*;
1274 /// # use futures::StreamExt;
1275 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1276 /// let tick = process.tick();
1277 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1278 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1279 /// batch.max().all_ticks()
1280 /// # }, |mut stream| async move {
1281 /// // 4
1282 /// # assert_eq!(stream.next().await.unwrap(), 4);
1283 /// # }));
1284 /// # }
1285 /// ```
1286 pub fn max(self) -> Optional<T, L, B>
1287 where
1288 T: Ord,
1289 {
1290 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1291 .assume_ordering_trusted_bounded::<TotalOrder>(
1292 nondet!(/** max is commutative, but order affects intermediates */),
1293 )
1294 .reduce(q!(|curr, new| {
1295 if new > *curr {
1296 *curr = new;
1297 }
1298 }))
1299 }
1300
1301 /// Computes the minimum element in the stream as an [`Optional`], which
1302 /// will be empty until the first element in the input arrives.
1303 ///
1304 /// # Example
1305 /// ```rust
1306 /// # #[cfg(feature = "deploy")] {
1307 /// # use hydro_lang::prelude::*;
1308 /// # use futures::StreamExt;
1309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1310 /// let tick = process.tick();
1311 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1312 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1313 /// batch.min().all_ticks()
1314 /// # }, |mut stream| async move {
1315 /// // 1
1316 /// # assert_eq!(stream.next().await.unwrap(), 1);
1317 /// # }));
1318 /// # }
1319 /// ```
1320 pub fn min(self) -> Optional<T, L, B>
1321 where
1322 T: Ord,
1323 {
1324 self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1325 .assume_ordering_trusted_bounded::<TotalOrder>(
1326 nondet!(/** max is commutative, but order affects intermediates */),
1327 )
1328 .reduce(q!(|curr, new| {
1329 if new < *curr {
1330 *curr = new;
1331 }
1332 }))
1333 }
1334
1335 /// Computes the first element in the stream as an [`Optional`], which
1336 /// will be empty until the first element in the input arrives.
1337 ///
1338 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1339 /// re-ordering of elements may cause the first element to change.
1340 ///
1341 /// # Example
1342 /// ```rust
1343 /// # #[cfg(feature = "deploy")] {
1344 /// # use hydro_lang::prelude::*;
1345 /// # use futures::StreamExt;
1346 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1347 /// let tick = process.tick();
1348 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1349 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1350 /// batch.first().all_ticks()
1351 /// # }, |mut stream| async move {
1352 /// // 1
1353 /// # assert_eq!(stream.next().await.unwrap(), 1);
1354 /// # }));
1355 /// # }
1356 /// ```
1357 pub fn first(self) -> Optional<T, L, B>
1358 where
1359 O: IsOrdered,
1360 {
1361 self.make_totally_ordered()
1362 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1363 .reduce(q!(|_, _| {}))
1364 }
1365
1366 /// Computes the last element in the stream as an [`Optional`], which
1367 /// will be empty until an element in the input arrives.
1368 ///
1369 /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1370 /// re-ordering of elements may cause the last element to change.
1371 ///
1372 /// # Example
1373 /// ```rust
1374 /// # #[cfg(feature = "deploy")] {
1375 /// # use hydro_lang::prelude::*;
1376 /// # use futures::StreamExt;
1377 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378 /// let tick = process.tick();
1379 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1380 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1381 /// batch.last().all_ticks()
1382 /// # }, |mut stream| async move {
1383 /// // 4
1384 /// # assert_eq!(stream.next().await.unwrap(), 4);
1385 /// # }));
1386 /// # }
1387 /// ```
1388 pub fn last(self) -> Optional<T, L, B>
1389 where
1390 O: IsOrdered,
1391 {
1392 self.make_totally_ordered()
1393 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1394 .reduce(q!(|curr, new| *curr = new))
1395 }
1396
1397 /// Collects all the elements of this stream into a single [`Vec`] element.
1398 ///
1399 /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1400 /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1401 /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1402 /// the vector at an arbitrary point in time.
1403 ///
1404 /// # Example
1405 /// ```rust
1406 /// # #[cfg(feature = "deploy")] {
1407 /// # use hydro_lang::prelude::*;
1408 /// # use futures::StreamExt;
1409 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1410 /// let tick = process.tick();
1411 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1412 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1413 /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1414 /// # }, |mut stream| async move {
1415 /// // [ vec![1, 2, 3, 4] ]
1416 /// # for w in vec![vec![1, 2, 3, 4]] {
1417 /// # assert_eq!(stream.next().await.unwrap(), w);
1418 /// # }
1419 /// # }));
1420 /// # }
1421 /// ```
1422 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1423 where
1424 O: IsOrdered,
1425 R: IsExactlyOnce,
1426 {
1427 self.make_totally_ordered().make_exactly_once().fold(
1428 q!(|| vec![]),
1429 q!(|acc, v| {
1430 acc.push(v);
1431 }),
1432 )
1433 }
1434
1435 /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1436 /// and emitting each intermediate result.
1437 ///
1438 /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1439 /// containing all intermediate accumulated values. The scan operation can also terminate early
1440 /// by returning `None`.
1441 ///
1442 /// The function takes a mutable reference to the accumulator and the current element, and returns
1443 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1444 /// If the function returns `None`, the stream is terminated and no more elements are processed.
1445 ///
1446 /// # Examples
1447 ///
1448 /// Basic usage - running sum:
1449 /// ```rust
1450 /// # #[cfg(feature = "deploy")] {
1451 /// # use hydro_lang::prelude::*;
1452 /// # use futures::StreamExt;
1453 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1454 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1455 /// q!(|| 0),
1456 /// q!(|acc, x| {
1457 /// *acc += x;
1458 /// Some(*acc)
1459 /// }),
1460 /// )
1461 /// # }, |mut stream| async move {
1462 /// // Output: 1, 3, 6, 10
1463 /// # for w in vec![1, 3, 6, 10] {
1464 /// # assert_eq!(stream.next().await.unwrap(), w);
1465 /// # }
1466 /// # }));
1467 /// # }
1468 /// ```
1469 ///
1470 /// Early termination example:
1471 /// ```rust
1472 /// # #[cfg(feature = "deploy")] {
1473 /// # use hydro_lang::prelude::*;
1474 /// # use futures::StreamExt;
1475 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1476 /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1477 /// q!(|| 1),
1478 /// q!(|state, x| {
1479 /// *state = *state * x;
1480 /// if *state > 6 {
1481 /// None // Terminate the stream
1482 /// } else {
1483 /// Some(-*state)
1484 /// }
1485 /// }),
1486 /// )
1487 /// # }, |mut stream| async move {
1488 /// // Output: -1, -2, -6
1489 /// # for w in vec![-1, -2, -6] {
1490 /// # assert_eq!(stream.next().await.unwrap(), w);
1491 /// # }
1492 /// # }));
1493 /// # }
1494 /// ```
1495 pub fn scan<A, U, I, F>(
1496 self,
1497 init: impl IntoQuotedMut<'a, I, L>,
1498 f: impl IntoQuotedMut<'a, F, L>,
1499 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1500 where
1501 O: IsOrdered,
1502 R: IsExactlyOnce,
1503 I: Fn() -> A + 'a,
1504 F: Fn(&mut A, T) -> Option<U> + 'a,
1505 {
1506 let init = init.splice_fn0_ctx(&self.location).into();
1507 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1508
1509 Stream::new(
1510 self.location.clone(),
1511 HydroNode::Scan {
1512 init,
1513 acc: f,
1514 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1515 metadata: self.location.new_node_metadata(
1516 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1517 ),
1518 },
1519 )
1520 }
1521
1522 /// Iteratively processes the elements of the stream using a state machine that can yield
1523 /// elements as it processes its inputs. This is designed to mirror the unstable generator
1524 /// syntax in Rust, without requiring special syntax.
1525 ///
1526 /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1527 /// state. The second argument defines the processing logic, taking in a mutable reference
1528 /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1529 /// variants define what is emitted and whether further inputs should be processed.
1530 ///
1531 /// # Example
1532 /// ```rust
1533 /// # #[cfg(feature = "deploy")] {
1534 /// # use hydro_lang::prelude::*;
1535 /// # use futures::StreamExt;
1536 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1537 /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1538 /// q!(|| 0),
1539 /// q!(|acc, x| {
1540 /// *acc += x;
1541 /// if *acc > 100 {
1542 /// hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1543 /// } else if *acc % 2 == 0 {
1544 /// hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1545 /// } else {
1546 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
1547 /// }
1548 /// }),
1549 /// )
1550 /// # }, |mut stream| async move {
1551 /// // Output: "even", "done!"
1552 /// # let mut results = Vec::new();
1553 /// # for _ in 0..2 {
1554 /// # results.push(stream.next().await.unwrap());
1555 /// # }
1556 /// # results.sort();
1557 /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1558 /// # }));
1559 /// # }
1560 /// ```
1561 pub fn generator<A, U, I, F>(
1562 self,
1563 init: impl IntoQuotedMut<'a, I, L> + Copy,
1564 f: impl IntoQuotedMut<'a, F, L> + Copy,
1565 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1566 where
1567 O: IsOrdered,
1568 R: IsExactlyOnce,
1569 I: Fn() -> A + 'a,
1570 F: Fn(&mut A, T) -> Generate<U> + 'a,
1571 {
1572 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1573 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1574
1575 let this = self.make_totally_ordered().make_exactly_once();
1576
1577 // State is Option<Option<A>>:
1578 // None = not yet initialized
1579 // Some(Some(a)) = active with state a
1580 // Some(None) = terminated
1581 let scan_init = q!(|| None)
1582 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1583 .into();
1584 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1585 if state.is_none() {
1586 *state = Some(Some(init()));
1587 }
1588 match state {
1589 Some(Some(state_value)) => match f(state_value, v) {
1590 Generate::Yield(out) => Some(Some(out)),
1591 Generate::Return(out) => {
1592 *state = Some(None);
1593 Some(Some(out))
1594 }
1595 // Unlike KeyedStream, we can terminate the scan directly on
1596 // Break/Return because there is only one state (no other keys
1597 // that still need processing).
1598 Generate::Break => None,
1599 Generate::Continue => Some(None),
1600 },
1601 // State is Some(None) after Return; terminate the scan.
1602 _ => None,
1603 }
1604 })
1605 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1606 .into();
1607
1608 let scan_node = HydroNode::Scan {
1609 init: scan_init,
1610 acc: scan_f,
1611 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1612 metadata: this.location.new_node_metadata(Stream::<
1613 Option<U>,
1614 L,
1615 B,
1616 TotalOrder,
1617 ExactlyOnce,
1618 >::collection_kind()),
1619 };
1620
1621 let flatten_f = q!(|d| d)
1622 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1623 .into();
1624 let flatten_node = HydroNode::FlatMap {
1625 f: flatten_f,
1626 input: Box::new(scan_node),
1627 metadata: this
1628 .location
1629 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1630 };
1631
1632 Stream::new(this.location.clone(), flatten_node)
1633 }
1634
1635 /// Given a time interval, returns a stream corresponding to samples taken from the
1636 /// stream roughly at that interval. The output will have elements in the same order
1637 /// as the input, but with arbitrary elements skipped between samples. There is also
1638 /// no guarantee on the exact timing of the samples.
1639 ///
1640 /// # Non-Determinism
1641 /// The output stream is non-deterministic in which elements are sampled, since this
1642 /// is controlled by a clock.
1643 pub fn sample_every(
1644 self,
1645 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1646 nondet: NonDet,
1647 ) -> Stream<T, L, Unbounded, O, AtLeastOnce>
1648 where
1649 L: NoTick + NoAtomic,
1650 {
1651 let samples = self.location.source_interval(interval, nondet);
1652
1653 let tick = self.location.tick();
1654 self.batch(&tick, nondet)
1655 .filter_if(samples.batch(&tick, nondet).first().is_some())
1656 .all_ticks()
1657 .weaken_retries()
1658 }
1659
1660 /// Given a timeout duration, returns an [`Optional`] which will have a value if the
1661 /// stream has not emitted a value since that duration.
1662 ///
1663 /// # Non-Determinism
1664 /// Timeout relies on non-deterministic sampling of the stream, so depending on when
1665 /// samples take place, timeouts may be non-deterministically generated or missed,
1666 /// and the notification of the timeout may be delayed as well. There is also no
1667 /// guarantee on how long the [`Optional`] will have a value after the timeout is
1668 /// detected based on when the next sample is taken.
1669 pub fn timeout(
1670 self,
1671 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L>> + Copy + 'a,
1672 nondet: NonDet,
1673 ) -> Optional<(), L, Unbounded>
1674 where
1675 L: NoTick + NoAtomic,
1676 {
1677 let tick = self.location.tick();
1678
1679 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
1680 q!(|| None),
1681 q!(
1682 |latest, _| {
1683 *latest = Some(Instant::now());
1684 },
1685 commutative = manual_proof!(/** TODO */)
1686 ),
1687 );
1688
1689 latest_received
1690 .snapshot(&tick, nondet)
1691 .filter_map(q!(move |latest_received| {
1692 if let Some(latest_received) = latest_received {
1693 if Instant::now().duration_since(latest_received) > duration {
1694 Some(())
1695 } else {
1696 None
1697 }
1698 } else {
1699 Some(())
1700 }
1701 }))
1702 .latest()
1703 }
1704
1705 /// Shifts this stream into an atomic context, which guarantees that any downstream logic
1706 /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
1707 ///
1708 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1709 /// processed before an acknowledgement is emitted.
1710 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
1711 let id = self.location.flow_state().borrow_mut().next_clock_id();
1712 let out_location = Atomic {
1713 tick: Tick {
1714 id,
1715 l: self.location.clone(),
1716 },
1717 };
1718 Stream::new(
1719 out_location.clone(),
1720 HydroNode::BeginAtomic {
1721 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1722 metadata: out_location
1723 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
1724 },
1725 )
1726 }
1727
1728 /// Given a tick, returns a stream corresponding to a batch of elements segmented by
1729 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1730 /// the order of the input. The output stream will execute in the [`Tick`] that was
1731 /// used to create the atomic section.
1732 ///
1733 /// # Non-Determinism
1734 /// The batch boundaries are non-deterministic and may change across executions.
1735 pub fn batch(self, tick: &Tick<L>, _nondet: NonDet) -> Stream<T, Tick<L>, Bounded, O, R> {
1736 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1737 Stream::new(
1738 tick.clone(),
1739 HydroNode::Batch {
1740 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1741 metadata: tick
1742 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
1743 },
1744 )
1745 }
1746
1747 /// An operator which allows you to "name" a `HydroNode`.
1748 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1749 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
1750 {
1751 let mut node = self.ir_node.borrow_mut();
1752 let metadata = node.metadata_mut();
1753 metadata.tag = Some(name.to_owned());
1754 }
1755 self
1756 }
1757
1758 /// Explicitly "casts" the stream to a type with a different ordering
1759 /// guarantee. Useful in unsafe code where the ordering cannot be proven
1760 /// by the type-system.
1761 ///
1762 /// # Non-Determinism
1763 /// This function is used as an escape hatch, and any mistakes in the
1764 /// provided ordering guarantee will propagate into the guarantees
1765 /// for the rest of the program.
1766 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> Stream<T, L, B, O2, R> {
1767 if O::ORDERING_KIND == O2::ORDERING_KIND {
1768 Stream::new(
1769 self.location.clone(),
1770 self.ir_node.replace(HydroNode::Placeholder),
1771 )
1772 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1773 // We can always weaken the ordering guarantee
1774 Stream::new(
1775 self.location.clone(),
1776 HydroNode::Cast {
1777 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1778 metadata: self
1779 .location
1780 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1781 },
1782 )
1783 } else {
1784 Stream::new(
1785 self.location.clone(),
1786 HydroNode::ObserveNonDet {
1787 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1788 trusted: false,
1789 metadata: self
1790 .location
1791 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1792 },
1793 )
1794 }
1795 }
1796
1797 // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
1798 // intermediate states will not be revealed
1799 fn assume_ordering_trusted_bounded<O2: Ordering>(
1800 self,
1801 nondet: NonDet,
1802 ) -> Stream<T, L, B, O2, R> {
1803 if B::BOUNDED {
1804 self.assume_ordering_trusted(nondet)
1805 } else {
1806 self.assume_ordering(nondet)
1807 }
1808 }
1809
1810 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1811 // is not observable
1812 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
1813 self,
1814 _nondet: NonDet,
1815 ) -> Stream<T, L, B, O2, R> {
1816 if O::ORDERING_KIND == O2::ORDERING_KIND {
1817 Stream::new(
1818 self.location.clone(),
1819 self.ir_node.replace(HydroNode::Placeholder),
1820 )
1821 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
1822 // We can always weaken the ordering guarantee
1823 Stream::new(
1824 self.location.clone(),
1825 HydroNode::Cast {
1826 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1827 metadata: self
1828 .location
1829 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1830 },
1831 )
1832 } else {
1833 Stream::new(
1834 self.location.clone(),
1835 HydroNode::ObserveNonDet {
1836 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1837 trusted: true,
1838 metadata: self
1839 .location
1840 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
1841 },
1842 )
1843 }
1844 }
1845
1846 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
1847 /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
1848 /// which is always safe because that is the weakest possible guarantee.
1849 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
1850 self.weaken_ordering::<NoOrder>()
1851 }
1852
1853 /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
1854 /// enforcing that `O2` is weaker than the input ordering guarantee.
1855 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
1856 let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
1857 self.assume_ordering::<O2>(nondet)
1858 }
1859
1860 /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
1861 /// implies that `O == TotalOrder`.
1862 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
1863 where
1864 O: IsOrdered,
1865 {
1866 self.assume_ordering(nondet!(/** no-op */))
1867 }
1868
1869 /// Explicitly "casts" the stream to a type with a different retries
1870 /// guarantee. Useful in unsafe code where the lack of retries cannot
1871 /// be proven by the type-system.
1872 ///
1873 /// # Non-Determinism
1874 /// This function is used as an escape hatch, and any mistakes in the
1875 /// provided retries guarantee will propagate into the guarantees
1876 /// for the rest of the program.
1877 pub fn assume_retries<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1878 if R::RETRIES_KIND == R2::RETRIES_KIND {
1879 Stream::new(
1880 self.location.clone(),
1881 self.ir_node.replace(HydroNode::Placeholder),
1882 )
1883 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1884 // We can always weaken the retries guarantee
1885 Stream::new(
1886 self.location.clone(),
1887 HydroNode::Cast {
1888 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1889 metadata: self
1890 .location
1891 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1892 },
1893 )
1894 } else {
1895 Stream::new(
1896 self.location.clone(),
1897 HydroNode::ObserveNonDet {
1898 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1899 trusted: false,
1900 metadata: self
1901 .location
1902 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1903 },
1904 )
1905 }
1906 }
1907
1908 // only for internal APIs that have been carefully vetted to ensure that the non-determinism
1909 // is not observable
1910 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
1911 if R::RETRIES_KIND == R2::RETRIES_KIND {
1912 Stream::new(
1913 self.location.clone(),
1914 self.ir_node.replace(HydroNode::Placeholder),
1915 )
1916 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
1917 // We can always weaken the retries guarantee
1918 Stream::new(
1919 self.location.clone(),
1920 HydroNode::Cast {
1921 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1922 metadata: self
1923 .location
1924 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1925 },
1926 )
1927 } else {
1928 Stream::new(
1929 self.location.clone(),
1930 HydroNode::ObserveNonDet {
1931 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1932 trusted: true,
1933 metadata: self
1934 .location
1935 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
1936 },
1937 )
1938 }
1939 }
1940
1941 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
1942 /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
1943 /// which is always safe because that is the weakest possible guarantee.
1944 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
1945 self.weaken_retries::<AtLeastOnce>()
1946 }
1947
1948 /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
1949 /// enforcing that `R2` is weaker than the input retries guarantee.
1950 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
1951 let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
1952 self.assume_retries::<R2>(nondet)
1953 }
1954
1955 /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
1956 /// implies that `R == ExactlyOnce`.
1957 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
1958 where
1959 R: IsExactlyOnce,
1960 {
1961 self.assume_retries(nondet!(/** no-op */))
1962 }
1963
1964 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1965 /// implies that `B == Bounded`.
1966 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
1967 where
1968 B: IsBounded,
1969 {
1970 Stream::new(
1971 self.location.clone(),
1972 self.ir_node.replace(HydroNode::Placeholder),
1973 )
1974 }
1975}
1976
1977impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
1978where
1979 L: Location<'a>,
1980{
1981 /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
1982 ///
1983 /// # Example
1984 /// ```rust
1985 /// # #[cfg(feature = "deploy")] {
1986 /// # use hydro_lang::prelude::*;
1987 /// # use futures::StreamExt;
1988 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1989 /// process.source_iter(q!(&[1, 2, 3])).cloned()
1990 /// # }, |mut stream| async move {
1991 /// // 1, 2, 3
1992 /// # for w in vec![1, 2, 3] {
1993 /// # assert_eq!(stream.next().await.unwrap(), w);
1994 /// # }
1995 /// # }));
1996 /// # }
1997 /// ```
1998 pub fn cloned(self) -> Stream<T, L, B, O, R>
1999 where
2000 T: Clone,
2001 {
2002 self.map(q!(|d| d.clone()))
2003 }
2004}
2005
2006impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2007where
2008 L: Location<'a>,
2009{
2010 /// Computes the number of elements in the stream as a [`Singleton`].
2011 ///
2012 /// # Example
2013 /// ```rust
2014 /// # #[cfg(feature = "deploy")] {
2015 /// # use hydro_lang::prelude::*;
2016 /// # use futures::StreamExt;
2017 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2018 /// let tick = process.tick();
2019 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2020 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2021 /// batch.count().all_ticks()
2022 /// # }, |mut stream| async move {
2023 /// // 4
2024 /// # assert_eq!(stream.next().await.unwrap(), 4);
2025 /// # }));
2026 /// # }
2027 /// ```
2028 pub fn count(self) -> Singleton<usize, L, B> {
2029 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2030 /// Order does not affect eventual count, and also does not affect intermediate states.
2031 ))
2032 .fold(q!(|| 0usize), q!(|count, _| *count += 1))
2033 }
2034}
2035
2036impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2037 /// Produces a new stream that merges the elements of the two input streams.
2038 /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2039 ///
2040 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2041 /// [`Bounded`], you can use [`Stream::chain`] instead.
2042 ///
2043 /// # Example
2044 /// ```rust
2045 /// # #[cfg(feature = "deploy")] {
2046 /// # use hydro_lang::prelude::*;
2047 /// # use futures::StreamExt;
2048 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2049 /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2050 /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2051 /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2052 /// # }, |mut stream| async move {
2053 /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2054 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2055 /// # assert_eq!(stream.next().await.unwrap(), w);
2056 /// # }
2057 /// # }));
2058 /// # }
2059 /// ```
2060 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2061 self,
2062 other: Stream<T, L, Unbounded, O2, R2>,
2063 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2064 where
2065 R: MinRetries<R2>,
2066 {
2067 Stream::new(
2068 self.location.clone(),
2069 HydroNode::Chain {
2070 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2071 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2072 metadata: self.location.new_node_metadata(Stream::<
2073 T,
2074 L,
2075 Unbounded,
2076 NoOrder,
2077 <R as MinRetries<R2>>::Min,
2078 >::collection_kind()),
2079 },
2080 )
2081 }
2082
2083 /// Deprecated: use [`Stream::merge_unordered`] instead.
2084 #[deprecated(note = "use `merge_unordered` instead")]
2085 pub fn interleave<O2: Ordering, R2: Retries>(
2086 self,
2087 other: Stream<T, L, Unbounded, O2, R2>,
2088 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2089 where
2090 R: MinRetries<R2>,
2091 {
2092 self.merge_unordered(other)
2093 }
2094}
2095
2096impl<'a, T, L: Location<'a> + NoTick, R: Retries> Stream<T, L, Unbounded, TotalOrder, R> {
2097 /// Produces a new stream that combines the elements of the two input streams,
2098 /// preserving the relative order of elements within each input.
2099 ///
2100 /// Currently, both input streams must be [`Unbounded`]. When the streams are
2101 /// [`Bounded`], you can use [`Stream::chain`] instead.
2102 ///
2103 /// # Non-Determinism
2104 /// The order in which elements *across* the two streams will be interleaved is
2105 /// non-deterministic, so the order of elements will vary across runs. If the output order
2106 /// is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic but emits an
2107 /// unordered stream.
2108 ///
2109 /// # Example
2110 /// ```rust
2111 /// # #[cfg(feature = "deploy")] {
2112 /// # use hydro_lang::prelude::*;
2113 /// # use futures::StreamExt;
2114 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2115 /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2116 /// # process.source_iter(q!(vec![1, 3])).into();
2117 /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2118 /// # }, |mut stream| async move {
2119 /// // 1, 3 and 2, 4 in some order, preserving the original local order
2120 /// # for w in vec![1, 3, 2, 4] {
2121 /// # assert_eq!(stream.next().await.unwrap(), w);
2122 /// # }
2123 /// # }));
2124 /// # }
2125 /// ```
2126 pub fn merge_ordered<R2: Retries>(
2127 self,
2128 other: Stream<T, L, Unbounded, TotalOrder, R2>,
2129 _nondet: NonDet,
2130 ) -> Stream<T, L, Unbounded, TotalOrder, <R as MinRetries<R2>>::Min>
2131 where
2132 R: MinRetries<R2>,
2133 {
2134 Stream::new(
2135 self.location.clone(),
2136 HydroNode::Chain {
2137 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2138 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2139 metadata: self.location.new_node_metadata(Stream::<
2140 T,
2141 L,
2142 Unbounded,
2143 TotalOrder,
2144 <R as MinRetries<R2>>::Min,
2145 >::collection_kind()),
2146 },
2147 )
2148 }
2149}
2150
2151impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2152where
2153 L: Location<'a>,
2154{
2155 /// Produces a new stream that emits the input elements in sorted order.
2156 ///
2157 /// The input stream can have any ordering guarantee, but the output stream
2158 /// will have a [`TotalOrder`] guarantee. This operator will block until all
2159 /// elements in the input stream are available, so it requires the input stream
2160 /// to be [`Bounded`].
2161 ///
2162 /// # Example
2163 /// ```rust
2164 /// # #[cfg(feature = "deploy")] {
2165 /// # use hydro_lang::prelude::*;
2166 /// # use futures::StreamExt;
2167 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2168 /// let tick = process.tick();
2169 /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2170 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2171 /// batch.sort().all_ticks()
2172 /// # }, |mut stream| async move {
2173 /// // 1, 2, 3, 4
2174 /// # for w in (1..5) {
2175 /// # assert_eq!(stream.next().await.unwrap(), w);
2176 /// # }
2177 /// # }));
2178 /// # }
2179 /// ```
2180 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2181 where
2182 B: IsBounded,
2183 T: Ord,
2184 {
2185 let this = self.make_bounded();
2186 Stream::new(
2187 this.location.clone(),
2188 HydroNode::Sort {
2189 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2190 metadata: this
2191 .location
2192 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2193 },
2194 )
2195 }
2196
2197 /// Produces a new stream that first emits the elements of the `self` stream,
2198 /// and then emits the elements of the `other` stream. The output stream has
2199 /// a [`TotalOrder`] guarantee if and only if both input streams have a
2200 /// [`TotalOrder`] guarantee.
2201 ///
2202 /// Currently, both input streams must be [`Bounded`]. This operator will block
2203 /// on the first stream until all its elements are available. In a future version,
2204 /// we will relax the requirement on the `other` stream.
2205 ///
2206 /// # Example
2207 /// ```rust
2208 /// # #[cfg(feature = "deploy")] {
2209 /// # use hydro_lang::prelude::*;
2210 /// # use futures::StreamExt;
2211 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2212 /// let tick = process.tick();
2213 /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2214 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2215 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2216 /// # }, |mut stream| async move {
2217 /// // 2, 3, 4, 5, 1, 2, 3, 4
2218 /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2219 /// # assert_eq!(stream.next().await.unwrap(), w);
2220 /// # }
2221 /// # }));
2222 /// # }
2223 /// ```
2224 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2225 self,
2226 other: Stream<T, L, B2, O2, R2>,
2227 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2228 where
2229 B: IsBounded,
2230 O: MinOrder<O2>,
2231 R: MinRetries<R2>,
2232 {
2233 check_matching_location(&self.location, &other.location);
2234
2235 Stream::new(
2236 self.location.clone(),
2237 HydroNode::Chain {
2238 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2239 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2240 metadata: self.location.new_node_metadata(Stream::<
2241 T,
2242 L,
2243 B2,
2244 <O as MinOrder<O2>>::Min,
2245 <R as MinRetries<R2>>::Min,
2246 >::collection_kind()),
2247 },
2248 )
2249 }
2250
2251 /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2252 /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2253 /// because this is compiled into a nested loop.
2254 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>>(
2255 self,
2256 other: Stream<T2, L, Bounded, O2, R>,
2257 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, R>
2258 where
2259 B: IsBounded,
2260 T: Clone,
2261 T2: Clone,
2262 {
2263 let this = self.make_bounded();
2264 check_matching_location(&this.location, &other.location);
2265
2266 Stream::new(
2267 this.location.clone(),
2268 HydroNode::CrossProduct {
2269 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2270 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2271 metadata: this.location.new_node_metadata(Stream::<
2272 (T, T2),
2273 L,
2274 Bounded,
2275 <O2 as MinOrder<O>>::Min,
2276 R,
2277 >::collection_kind()),
2278 },
2279 )
2280 }
2281
2282 /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2283 /// `self` used as the values for *each* key.
2284 ///
2285 /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2286 /// values. For example, it can be used to send the same set of elements to several cluster
2287 /// members, if the membership information is available as a [`KeyedSingleton`].
2288 ///
2289 /// # Example
2290 /// ```rust
2291 /// # #[cfg(feature = "deploy")] {
2292 /// # use hydro_lang::prelude::*;
2293 /// # use futures::StreamExt;
2294 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2295 /// # let tick = process.tick();
2296 /// let keyed_singleton = // { 1: (), 2: () }
2297 /// # process
2298 /// # .source_iter(q!(vec![(1, ()), (2, ())]))
2299 /// # .into_keyed()
2300 /// # .batch(&tick, nondet!(/** test */))
2301 /// # .first();
2302 /// let stream = // [ "a", "b" ]
2303 /// # process
2304 /// # .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2305 /// # .batch(&tick, nondet!(/** test */));
2306 /// stream.repeat_with_keys(keyed_singleton)
2307 /// # .entries().all_ticks()
2308 /// # }, |mut stream| async move {
2309 /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2310 /// # let mut results = Vec::new();
2311 /// # for _ in 0..4 {
2312 /// # results.push(stream.next().await.unwrap());
2313 /// # }
2314 /// # results.sort();
2315 /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2316 /// # }));
2317 /// # }
2318 /// ```
2319 pub fn repeat_with_keys<K, V2>(
2320 self,
2321 keys: KeyedSingleton<K, V2, L, Bounded>,
2322 ) -> KeyedStream<K, T, L, Bounded, O, R>
2323 where
2324 B: IsBounded,
2325 K: Clone,
2326 T: Clone,
2327 {
2328 keys.keys()
2329 .weaken_retries()
2330 .assume_ordering_trusted::<TotalOrder>(
2331 nondet!(/** keyed stream does not depend on ordering of keys */),
2332 )
2333 .cross_product_nested_loop(self.make_bounded())
2334 .into_keyed()
2335 }
2336
2337 /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2338 /// execution until all results are available. The output order is based on when futures
2339 /// complete, and may be different than the input order.
2340 ///
2341 /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2342 /// while futures are pending, this variant blocks until the futures resolve.
2343 ///
2344 /// # Example
2345 /// ```rust
2346 /// # #[cfg(feature = "deploy")] {
2347 /// # use std::collections::HashSet;
2348 /// # use futures::StreamExt;
2349 /// # use hydro_lang::prelude::*;
2350 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2351 /// process
2352 /// .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2353 /// .map(q!(|x| async move {
2354 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2355 /// x
2356 /// }))
2357 /// .resolve_futures_blocking()
2358 /// # },
2359 /// # |mut stream| async move {
2360 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2361 /// # let mut output = HashSet::new();
2362 /// # for _ in 1..10 {
2363 /// # output.insert(stream.next().await.unwrap());
2364 /// # }
2365 /// # assert_eq!(
2366 /// # output,
2367 /// # HashSet::<i32>::from_iter(1..10)
2368 /// # );
2369 /// # },
2370 /// # ));
2371 /// # }
2372 /// ```
2373 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2374 where
2375 T: Future,
2376 {
2377 Stream::new(
2378 self.location.clone(),
2379 HydroNode::ResolveFuturesBlocking {
2380 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2381 metadata: self
2382 .location
2383 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2384 },
2385 )
2386 }
2387
2388 /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2389 ///
2390 /// # Example
2391 /// ```rust
2392 /// # #[cfg(feature = "deploy")] {
2393 /// # use hydro_lang::prelude::*;
2394 /// # use futures::StreamExt;
2395 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2396 /// let tick = process.tick();
2397 /// let empty: Stream<i32, _, Bounded> = process
2398 /// .source_iter(q!(Vec::<i32>::new()))
2399 /// .batch(&tick, nondet!(/** test */));
2400 /// empty.is_empty().all_ticks()
2401 /// # }, |mut stream| async move {
2402 /// // true
2403 /// # assert_eq!(stream.next().await.unwrap(), true);
2404 /// # }));
2405 /// # }
2406 /// ```
2407 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2408 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2409 where
2410 B: IsBounded,
2411 {
2412 self.make_bounded()
2413 .assume_ordering_trusted::<TotalOrder>(
2414 nondet!(/** is_empty intermediates unaffected by order */),
2415 )
2416 .assume_retries_trusted::<ExactlyOnce>(nondet!(/** is_empty is idempotent */))
2417 .fold(q!(|| true), q!(|empty, _| { *empty = false },))
2418 }
2419}
2420
2421impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2422where
2423 L: Location<'a>,
2424{
2425 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2426 /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2427 /// by equi-joining the two streams on the key attribute `K`.
2428 ///
2429 /// # Example
2430 /// ```rust
2431 /// # #[cfg(feature = "deploy")] {
2432 /// # use hydro_lang::prelude::*;
2433 /// # use std::collections::HashSet;
2434 /// # use futures::StreamExt;
2435 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2436 /// let tick = process.tick();
2437 /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2438 /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2439 /// stream1.join(stream2)
2440 /// # }, |mut stream| async move {
2441 /// // (1, ('a', 'x')), (2, ('b', 'y'))
2442 /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2443 /// # stream.map(|i| assert!(expected.contains(&i)));
2444 /// # }));
2445 /// # }
2446 pub fn join<V2, O2: Ordering, R2: Retries>(
2447 self,
2448 n: Stream<(K, V2), L, B, O2, R2>,
2449 ) -> Stream<(K, (V1, V2)), L, B, NoOrder, <R as MinRetries<R2>>::Min>
2450 where
2451 K: Eq + Hash,
2452 R: MinRetries<R2>,
2453 {
2454 check_matching_location(&self.location, &n.location);
2455
2456 Stream::new(
2457 self.location.clone(),
2458 HydroNode::Join {
2459 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2460 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2461 metadata: self.location.new_node_metadata(Stream::<
2462 (K, (V1, V2)),
2463 L,
2464 B,
2465 NoOrder,
2466 <R as MinRetries<R2>>::Min,
2467 >::collection_kind()),
2468 },
2469 )
2470 }
2471
2472 /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2473 /// computes the anti-join of the items in the input -- i.e. returns
2474 /// unique items in the first input that do not have a matching key
2475 /// in the second input.
2476 ///
2477 /// # Example
2478 /// ```rust
2479 /// # #[cfg(feature = "deploy")] {
2480 /// # use hydro_lang::prelude::*;
2481 /// # use futures::StreamExt;
2482 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2483 /// let tick = process.tick();
2484 /// let stream = process
2485 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2486 /// .batch(&tick, nondet!(/** test */));
2487 /// let batch = process
2488 /// .source_iter(q!(vec![1, 2]))
2489 /// .batch(&tick, nondet!(/** test */));
2490 /// stream.anti_join(batch).all_ticks()
2491 /// # }, |mut stream| async move {
2492 /// # for w in vec![(3, 'c'), (4, 'd')] {
2493 /// # assert_eq!(stream.next().await.unwrap(), w);
2494 /// # }
2495 /// # }));
2496 /// # }
2497 pub fn anti_join<O2: Ordering, R2: Retries>(
2498 self,
2499 n: Stream<K, L, Bounded, O2, R2>,
2500 ) -> Stream<(K, V1), L, B, O, R>
2501 where
2502 K: Eq + Hash,
2503 {
2504 check_matching_location(&self.location, &n.location);
2505
2506 Stream::new(
2507 self.location.clone(),
2508 HydroNode::AntiJoin {
2509 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2510 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2511 metadata: self
2512 .location
2513 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2514 },
2515 )
2516 }
2517}
2518
2519impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2520 Stream<(K, V), L, B, O, R>
2521{
2522 /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2523 /// is used as the key and the second element is added to the entries associated with that key.
2524 ///
2525 /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2526 /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2527 /// performing grouped aggregations, but also for more precise ordering guarantees such as
2528 /// total ordering _within_ each group but no ordering _across_ groups.
2529 ///
2530 /// # Example
2531 /// ```rust
2532 /// # #[cfg(feature = "deploy")] {
2533 /// # use hydro_lang::prelude::*;
2534 /// # use futures::StreamExt;
2535 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2536 /// process
2537 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2538 /// .into_keyed()
2539 /// # .entries()
2540 /// # }, |mut stream| async move {
2541 /// // { 1: [2, 3], 2: [4] }
2542 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2543 /// # assert_eq!(stream.next().await.unwrap(), w);
2544 /// # }
2545 /// # }));
2546 /// # }
2547 /// ```
2548 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2549 KeyedStream::new(
2550 self.location.clone(),
2551 HydroNode::Cast {
2552 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2553 metadata: self
2554 .location
2555 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2556 },
2557 )
2558 }
2559}
2560
2561impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2562where
2563 K: Eq + Hash,
2564 L: Location<'a>,
2565{
2566 /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2567 /// # Example
2568 /// ```rust
2569 /// # #[cfg(feature = "deploy")] {
2570 /// # use hydro_lang::prelude::*;
2571 /// # use futures::StreamExt;
2572 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2573 /// let tick = process.tick();
2574 /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2575 /// let batch = numbers.batch(&tick, nondet!(/** test */));
2576 /// batch.keys().all_ticks()
2577 /// # }, |mut stream| async move {
2578 /// // 1, 2
2579 /// # assert_eq!(stream.next().await.unwrap(), 1);
2580 /// # assert_eq!(stream.next().await.unwrap(), 2);
2581 /// # }));
2582 /// # }
2583 /// ```
2584 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
2585 self.into_keyed()
2586 .fold(
2587 q!(|| ()),
2588 q!(
2589 |_, _| {},
2590 commutative = manual_proof!(/** values are ignored */),
2591 idempotent = manual_proof!(/** values are ignored */)
2592 ),
2593 )
2594 .keys()
2595 }
2596}
2597
2598impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
2599where
2600 L: Location<'a> + NoTick,
2601{
2602 /// Returns a stream corresponding to the latest batch of elements being atomically
2603 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
2604 /// the order of the input.
2605 ///
2606 /// # Non-Determinism
2607 /// The batch boundaries are non-deterministic and may change across executions.
2608 pub fn batch_atomic(
2609 self,
2610 tick: &Tick<L>,
2611 _nondet: NonDet,
2612 ) -> Stream<T, Tick<L>, Bounded, O, R> {
2613 Stream::new(
2614 tick.clone(),
2615 HydroNode::Batch {
2616 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2617 metadata: tick
2618 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2619 },
2620 )
2621 }
2622
2623 /// Yields the elements of this stream back into a top-level, asynchronous execution context.
2624 /// See [`Stream::atomic`] for more details.
2625 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
2626 Stream::new(
2627 self.location.tick.l.clone(),
2628 HydroNode::EndAtomic {
2629 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2630 metadata: self
2631 .location
2632 .tick
2633 .l
2634 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
2635 },
2636 )
2637 }
2638}
2639
2640impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
2641where
2642 L: Location<'a> + NoTick + NoAtomic,
2643 F: Future<Output = T>,
2644{
2645 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2646 /// Future outputs are produced as available, regardless of input arrival order.
2647 ///
2648 /// # Example
2649 /// ```rust
2650 /// # #[cfg(feature = "deploy")] {
2651 /// # use std::collections::HashSet;
2652 /// # use futures::StreamExt;
2653 /// # use hydro_lang::prelude::*;
2654 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2655 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2656 /// .map(q!(|x| async move {
2657 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2658 /// x
2659 /// }))
2660 /// .resolve_futures()
2661 /// # },
2662 /// # |mut stream| async move {
2663 /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2664 /// # let mut output = HashSet::new();
2665 /// # for _ in 1..10 {
2666 /// # output.insert(stream.next().await.unwrap());
2667 /// # }
2668 /// # assert_eq!(
2669 /// # output,
2670 /// # HashSet::<i32>::from_iter(1..10)
2671 /// # );
2672 /// # },
2673 /// # ));
2674 /// # }
2675 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
2676 Stream::new(
2677 self.location.clone(),
2678 HydroNode::ResolveFutures {
2679 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2680 metadata: self
2681 .location
2682 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
2683 },
2684 )
2685 }
2686
2687 /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
2688 /// Future outputs are produced in the same order as the input stream.
2689 ///
2690 /// # Example
2691 /// ```rust
2692 /// # #[cfg(feature = "deploy")] {
2693 /// # use std::collections::HashSet;
2694 /// # use futures::StreamExt;
2695 /// # use hydro_lang::prelude::*;
2696 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2697 /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2698 /// .map(q!(|x| async move {
2699 /// tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2700 /// x
2701 /// }))
2702 /// .resolve_futures_ordered()
2703 /// # },
2704 /// # |mut stream| async move {
2705 /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
2706 /// # let mut output = Vec::new();
2707 /// # for _ in 1..10 {
2708 /// # output.push(stream.next().await.unwrap());
2709 /// # }
2710 /// # assert_eq!(
2711 /// # output,
2712 /// # vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
2713 /// # );
2714 /// # },
2715 /// # ));
2716 /// # }
2717 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
2718 Stream::new(
2719 self.location.clone(),
2720 HydroNode::ResolveFuturesOrdered {
2721 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2722 metadata: self
2723 .location
2724 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2725 },
2726 )
2727 }
2728}
2729
2730impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
2731where
2732 L: Location<'a>,
2733{
2734 /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
2735 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2736 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
2737 Stream::new(
2738 self.location.outer().clone(),
2739 HydroNode::YieldConcat {
2740 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2741 metadata: self
2742 .location
2743 .outer()
2744 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
2745 },
2746 )
2747 }
2748
2749 /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
2750 /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
2751 ///
2752 /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
2753 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
2754 /// stream's [`Tick`] context.
2755 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
2756 let out_location = Atomic {
2757 tick: self.location.clone(),
2758 };
2759
2760 Stream::new(
2761 out_location.clone(),
2762 HydroNode::YieldConcat {
2763 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2764 metadata: out_location
2765 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
2766 },
2767 )
2768 }
2769
2770 /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
2771 /// such as `fold` retrain their memory across ticks rather than resetting across batches of
2772 /// input.
2773 ///
2774 /// This API is particularly useful for stateful computation on batches of data, such as
2775 /// maintaining an accumulated state that is up to date with the current batch.
2776 ///
2777 /// # Example
2778 /// ```rust
2779 /// # #[cfg(feature = "deploy")] {
2780 /// # use hydro_lang::prelude::*;
2781 /// # use futures::StreamExt;
2782 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2783 /// let tick = process.tick();
2784 /// # // ticks are lazy by default, forces the second tick to run
2785 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2786 /// # let batch_first_tick = process
2787 /// # .source_iter(q!(vec![1, 2, 3, 4]))
2788 /// # .batch(&tick, nondet!(/** test */));
2789 /// # let batch_second_tick = process
2790 /// # .source_iter(q!(vec![5, 6, 7]))
2791 /// # .batch(&tick, nondet!(/** test */))
2792 /// # .defer_tick(); // appears on the second tick
2793 /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
2794 /// # batch_first_tick.chain(batch_second_tick).all_ticks();
2795 ///
2796 /// input.batch(&tick, nondet!(/** test */))
2797 /// .across_ticks(|s| s.count()).all_ticks()
2798 /// # }, |mut stream| async move {
2799 /// // [4, 7]
2800 /// assert_eq!(stream.next().await.unwrap(), 4);
2801 /// assert_eq!(stream.next().await.unwrap(), 7);
2802 /// # }));
2803 /// # }
2804 /// ```
2805 pub fn across_ticks<Out: BatchAtomic>(
2806 self,
2807 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
2808 ) -> Out::Batched {
2809 thunk(self.all_ticks_atomic()).batched_atomic()
2810 }
2811
2812 /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
2813 /// always has the elements of `self` at tick `T - 1`.
2814 ///
2815 /// At tick `0`, the output stream is empty, since there is no previous tick.
2816 ///
2817 /// This operator enables stateful iterative processing with ticks, by sending data from one
2818 /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
2819 ///
2820 /// # Example
2821 /// ```rust
2822 /// # #[cfg(feature = "deploy")] {
2823 /// # use hydro_lang::prelude::*;
2824 /// # use futures::StreamExt;
2825 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2826 /// let tick = process.tick();
2827 /// // ticks are lazy by default, forces the second tick to run
2828 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
2829 ///
2830 /// let batch_first_tick = process
2831 /// .source_iter(q!(vec![1, 2, 3, 4]))
2832 /// .batch(&tick, nondet!(/** test */));
2833 /// let batch_second_tick = process
2834 /// .source_iter(q!(vec![0, 3, 4, 5, 6]))
2835 /// .batch(&tick, nondet!(/** test */))
2836 /// .defer_tick(); // appears on the second tick
2837 /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
2838 ///
2839 /// changes_across_ticks.clone().filter_not_in(
2840 /// changes_across_ticks.defer_tick() // the elements from the previous tick
2841 /// ).all_ticks()
2842 /// # }, |mut stream| async move {
2843 /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
2844 /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
2845 /// # assert_eq!(stream.next().await.unwrap(), w);
2846 /// # }
2847 /// # }));
2848 /// # }
2849 /// ```
2850 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
2851 Stream::new(
2852 self.location.clone(),
2853 HydroNode::DeferTick {
2854 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2855 metadata: self
2856 .location
2857 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2858 },
2859 )
2860 }
2861}
2862
2863#[cfg(test)]
2864mod tests {
2865 #[cfg(feature = "deploy")]
2866 use futures::{SinkExt, StreamExt};
2867 #[cfg(feature = "deploy")]
2868 use hydro_deploy::Deployment;
2869 #[cfg(feature = "deploy")]
2870 use serde::{Deserialize, Serialize};
2871 #[cfg(any(feature = "deploy", feature = "sim"))]
2872 use stageleft::q;
2873
2874 #[cfg(any(feature = "deploy", feature = "sim"))]
2875 use crate::compile::builder::FlowBuilder;
2876 #[cfg(feature = "deploy")]
2877 use crate::live_collections::sliced::sliced;
2878 #[cfg(feature = "deploy")]
2879 use crate::live_collections::stream::ExactlyOnce;
2880 #[cfg(feature = "sim")]
2881 use crate::live_collections::stream::NoOrder;
2882 #[cfg(any(feature = "deploy", feature = "sim"))]
2883 use crate::live_collections::stream::TotalOrder;
2884 #[cfg(any(feature = "deploy", feature = "sim"))]
2885 use crate::location::Location;
2886 #[cfg(any(feature = "deploy", feature = "sim"))]
2887 use crate::nondet::nondet;
2888
2889 mod backtrace_chained_ops;
2890
2891 #[cfg(feature = "deploy")]
2892 struct P1 {}
2893 #[cfg(feature = "deploy")]
2894 struct P2 {}
2895
2896 #[cfg(feature = "deploy")]
2897 #[derive(Serialize, Deserialize, Debug)]
2898 struct SendOverNetwork {
2899 n: u32,
2900 }
2901
2902 #[cfg(feature = "deploy")]
2903 #[tokio::test]
2904 async fn first_ten_distributed() {
2905 use crate::networking::TCP;
2906
2907 let mut deployment = Deployment::new();
2908
2909 let mut flow = FlowBuilder::new();
2910 let first_node = flow.process::<P1>();
2911 let second_node = flow.process::<P2>();
2912 let external = flow.external::<P2>();
2913
2914 let numbers = first_node.source_iter(q!(0..10));
2915 let out_port = numbers
2916 .map(q!(|n| SendOverNetwork { n }))
2917 .send(&second_node, TCP.fail_stop().bincode())
2918 .send_bincode_external(&external);
2919
2920 let nodes = flow
2921 .with_process(&first_node, deployment.Localhost())
2922 .with_process(&second_node, deployment.Localhost())
2923 .with_external(&external, deployment.Localhost())
2924 .deploy(&mut deployment);
2925
2926 deployment.deploy().await.unwrap();
2927
2928 let mut external_out = nodes.connect(out_port).await;
2929
2930 deployment.start().await.unwrap();
2931
2932 for i in 0..10 {
2933 assert_eq!(external_out.next().await.unwrap().n, i);
2934 }
2935 }
2936
2937 #[cfg(feature = "deploy")]
2938 #[tokio::test]
2939 async fn first_cardinality() {
2940 let mut deployment = Deployment::new();
2941
2942 let mut flow = FlowBuilder::new();
2943 let node = flow.process::<()>();
2944 let external = flow.external::<()>();
2945
2946 let node_tick = node.tick();
2947 let count = node_tick
2948 .singleton(q!([1, 2, 3]))
2949 .into_stream()
2950 .flatten_ordered()
2951 .first()
2952 .into_stream()
2953 .count()
2954 .all_ticks()
2955 .send_bincode_external(&external);
2956
2957 let nodes = flow
2958 .with_process(&node, deployment.Localhost())
2959 .with_external(&external, deployment.Localhost())
2960 .deploy(&mut deployment);
2961
2962 deployment.deploy().await.unwrap();
2963
2964 let mut external_out = nodes.connect(count).await;
2965
2966 deployment.start().await.unwrap();
2967
2968 assert_eq!(external_out.next().await.unwrap(), 1);
2969 }
2970
2971 #[cfg(feature = "deploy")]
2972 #[tokio::test]
2973 async fn unbounded_reduce_remembers_state() {
2974 let mut deployment = Deployment::new();
2975
2976 let mut flow = FlowBuilder::new();
2977 let node = flow.process::<()>();
2978 let external = flow.external::<()>();
2979
2980 let (input_port, input) = node.source_external_bincode(&external);
2981 let out = input
2982 .reduce(q!(|acc, v| *acc += v))
2983 .sample_eager(nondet!(/** test */))
2984 .send_bincode_external(&external);
2985
2986 let nodes = flow
2987 .with_process(&node, deployment.Localhost())
2988 .with_external(&external, deployment.Localhost())
2989 .deploy(&mut deployment);
2990
2991 deployment.deploy().await.unwrap();
2992
2993 let mut external_in = nodes.connect(input_port).await;
2994 let mut external_out = nodes.connect(out).await;
2995
2996 deployment.start().await.unwrap();
2997
2998 external_in.send(1).await.unwrap();
2999 assert_eq!(external_out.next().await.unwrap(), 1);
3000
3001 external_in.send(2).await.unwrap();
3002 assert_eq!(external_out.next().await.unwrap(), 3);
3003 }
3004
3005 #[cfg(feature = "deploy")]
3006 #[tokio::test]
3007 async fn top_level_bounded_cross_singleton() {
3008 let mut deployment = Deployment::new();
3009
3010 let mut flow = FlowBuilder::new();
3011 let node = flow.process::<()>();
3012 let external = flow.external::<()>();
3013
3014 let (input_port, input) =
3015 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3016
3017 let out = input
3018 .cross_singleton(
3019 node.source_iter(q!(vec![1, 2, 3]))
3020 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3021 )
3022 .send_bincode_external(&external);
3023
3024 let nodes = flow
3025 .with_process(&node, deployment.Localhost())
3026 .with_external(&external, deployment.Localhost())
3027 .deploy(&mut deployment);
3028
3029 deployment.deploy().await.unwrap();
3030
3031 let mut external_in = nodes.connect(input_port).await;
3032 let mut external_out = nodes.connect(out).await;
3033
3034 deployment.start().await.unwrap();
3035
3036 external_in.send(1).await.unwrap();
3037 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3038
3039 external_in.send(2).await.unwrap();
3040 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3041 }
3042
3043 #[cfg(feature = "deploy")]
3044 #[tokio::test]
3045 async fn top_level_bounded_reduce_cardinality() {
3046 let mut deployment = Deployment::new();
3047
3048 let mut flow = FlowBuilder::new();
3049 let node = flow.process::<()>();
3050 let external = flow.external::<()>();
3051
3052 let (input_port, input) =
3053 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3054
3055 let out = sliced! {
3056 let input = use(input, nondet!(/** test */));
3057 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3058 input.cross_singleton(v.into_stream().count())
3059 }
3060 .send_bincode_external(&external);
3061
3062 let nodes = flow
3063 .with_process(&node, deployment.Localhost())
3064 .with_external(&external, deployment.Localhost())
3065 .deploy(&mut deployment);
3066
3067 deployment.deploy().await.unwrap();
3068
3069 let mut external_in = nodes.connect(input_port).await;
3070 let mut external_out = nodes.connect(out).await;
3071
3072 deployment.start().await.unwrap();
3073
3074 external_in.send(1).await.unwrap();
3075 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3076
3077 external_in.send(2).await.unwrap();
3078 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3079 }
3080
3081 #[cfg(feature = "deploy")]
3082 #[tokio::test]
3083 async fn top_level_bounded_into_singleton_cardinality() {
3084 let mut deployment = Deployment::new();
3085
3086 let mut flow = FlowBuilder::new();
3087 let node = flow.process::<()>();
3088 let external = flow.external::<()>();
3089
3090 let (input_port, input) =
3091 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3092
3093 let out = sliced! {
3094 let input = use(input, nondet!(/** test */));
3095 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3096 input.cross_singleton(v.into_stream().count())
3097 }
3098 .send_bincode_external(&external);
3099
3100 let nodes = flow
3101 .with_process(&node, deployment.Localhost())
3102 .with_external(&external, deployment.Localhost())
3103 .deploy(&mut deployment);
3104
3105 deployment.deploy().await.unwrap();
3106
3107 let mut external_in = nodes.connect(input_port).await;
3108 let mut external_out = nodes.connect(out).await;
3109
3110 deployment.start().await.unwrap();
3111
3112 external_in.send(1).await.unwrap();
3113 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3114
3115 external_in.send(2).await.unwrap();
3116 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3117 }
3118
3119 #[cfg(feature = "deploy")]
3120 #[tokio::test]
3121 async fn atomic_fold_replays_each_tick() {
3122 let mut deployment = Deployment::new();
3123
3124 let mut flow = FlowBuilder::new();
3125 let node = flow.process::<()>();
3126 let external = flow.external::<()>();
3127
3128 let (input_port, input) =
3129 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3130 let tick = node.tick();
3131
3132 let out = input
3133 .batch(&tick, nondet!(/** test */))
3134 .cross_singleton(
3135 node.source_iter(q!(vec![1, 2, 3]))
3136 .atomic()
3137 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3138 .snapshot_atomic(&tick, nondet!(/** test */)),
3139 )
3140 .all_ticks()
3141 .send_bincode_external(&external);
3142
3143 let nodes = flow
3144 .with_process(&node, deployment.Localhost())
3145 .with_external(&external, deployment.Localhost())
3146 .deploy(&mut deployment);
3147
3148 deployment.deploy().await.unwrap();
3149
3150 let mut external_in = nodes.connect(input_port).await;
3151 let mut external_out = nodes.connect(out).await;
3152
3153 deployment.start().await.unwrap();
3154
3155 external_in.send(1).await.unwrap();
3156 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3157
3158 external_in.send(2).await.unwrap();
3159 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3160 }
3161
3162 #[cfg(feature = "deploy")]
3163 #[tokio::test]
3164 async fn unbounded_scan_remembers_state() {
3165 let mut deployment = Deployment::new();
3166
3167 let mut flow = FlowBuilder::new();
3168 let node = flow.process::<()>();
3169 let external = flow.external::<()>();
3170
3171 let (input_port, input) = node.source_external_bincode(&external);
3172 let out = input
3173 .scan(
3174 q!(|| 0),
3175 q!(|acc, v| {
3176 *acc += v;
3177 Some(*acc)
3178 }),
3179 )
3180 .send_bincode_external(&external);
3181
3182 let nodes = flow
3183 .with_process(&node, deployment.Localhost())
3184 .with_external(&external, deployment.Localhost())
3185 .deploy(&mut deployment);
3186
3187 deployment.deploy().await.unwrap();
3188
3189 let mut external_in = nodes.connect(input_port).await;
3190 let mut external_out = nodes.connect(out).await;
3191
3192 deployment.start().await.unwrap();
3193
3194 external_in.send(1).await.unwrap();
3195 assert_eq!(external_out.next().await.unwrap(), 1);
3196
3197 external_in.send(2).await.unwrap();
3198 assert_eq!(external_out.next().await.unwrap(), 3);
3199 }
3200
3201 #[cfg(feature = "deploy")]
3202 #[tokio::test]
3203 async fn unbounded_enumerate_remembers_state() {
3204 let mut deployment = Deployment::new();
3205
3206 let mut flow = FlowBuilder::new();
3207 let node = flow.process::<()>();
3208 let external = flow.external::<()>();
3209
3210 let (input_port, input) = node.source_external_bincode(&external);
3211 let out = input.enumerate().send_bincode_external(&external);
3212
3213 let nodes = flow
3214 .with_process(&node, deployment.Localhost())
3215 .with_external(&external, deployment.Localhost())
3216 .deploy(&mut deployment);
3217
3218 deployment.deploy().await.unwrap();
3219
3220 let mut external_in = nodes.connect(input_port).await;
3221 let mut external_out = nodes.connect(out).await;
3222
3223 deployment.start().await.unwrap();
3224
3225 external_in.send(1).await.unwrap();
3226 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3227
3228 external_in.send(2).await.unwrap();
3229 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3230 }
3231
3232 #[cfg(feature = "deploy")]
3233 #[tokio::test]
3234 async fn unbounded_unique_remembers_state() {
3235 let mut deployment = Deployment::new();
3236
3237 let mut flow = FlowBuilder::new();
3238 let node = flow.process::<()>();
3239 let external = flow.external::<()>();
3240
3241 let (input_port, input) =
3242 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3243 let out = input.unique().send_bincode_external(&external);
3244
3245 let nodes = flow
3246 .with_process(&node, deployment.Localhost())
3247 .with_external(&external, deployment.Localhost())
3248 .deploy(&mut deployment);
3249
3250 deployment.deploy().await.unwrap();
3251
3252 let mut external_in = nodes.connect(input_port).await;
3253 let mut external_out = nodes.connect(out).await;
3254
3255 deployment.start().await.unwrap();
3256
3257 external_in.send(1).await.unwrap();
3258 assert_eq!(external_out.next().await.unwrap(), 1);
3259
3260 external_in.send(2).await.unwrap();
3261 assert_eq!(external_out.next().await.unwrap(), 2);
3262
3263 external_in.send(1).await.unwrap();
3264 external_in.send(3).await.unwrap();
3265 assert_eq!(external_out.next().await.unwrap(), 3);
3266 }
3267
3268 #[cfg(feature = "sim")]
3269 #[test]
3270 #[should_panic]
3271 fn sim_batch_nondet_size() {
3272 let mut flow = FlowBuilder::new();
3273 let node = flow.process::<()>();
3274
3275 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3276
3277 let tick = node.tick();
3278 let out_recv = input
3279 .batch(&tick, nondet!(/** test */))
3280 .count()
3281 .all_ticks()
3282 .sim_output();
3283
3284 flow.sim().exhaustive(async || {
3285 in_send.send(());
3286 in_send.send(());
3287 in_send.send(());
3288
3289 assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3290 });
3291 }
3292
3293 #[cfg(feature = "sim")]
3294 #[test]
3295 fn sim_batch_preserves_order() {
3296 let mut flow = FlowBuilder::new();
3297 let node = flow.process::<()>();
3298
3299 let (in_send, input) = node.sim_input();
3300
3301 let tick = node.tick();
3302 let out_recv = input
3303 .batch(&tick, nondet!(/** test */))
3304 .all_ticks()
3305 .sim_output();
3306
3307 flow.sim().exhaustive(async || {
3308 in_send.send(1);
3309 in_send.send(2);
3310 in_send.send(3);
3311
3312 out_recv.assert_yields_only([1, 2, 3]).await;
3313 });
3314 }
3315
3316 #[cfg(feature = "sim")]
3317 #[test]
3318 #[should_panic]
3319 fn sim_batch_unordered_shuffles() {
3320 let mut flow = FlowBuilder::new();
3321 let node = flow.process::<()>();
3322
3323 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3324
3325 let tick = node.tick();
3326 let batch = input.batch(&tick, nondet!(/** test */));
3327 let out_recv = batch
3328 .clone()
3329 .min()
3330 .zip(batch.max())
3331 .all_ticks()
3332 .sim_output();
3333
3334 flow.sim().exhaustive(async || {
3335 in_send.send_many_unordered([1, 2, 3]);
3336
3337 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3338 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3339 }
3340 });
3341 }
3342
3343 #[cfg(feature = "sim")]
3344 #[test]
3345 fn sim_batch_unordered_shuffles_count() {
3346 let mut flow = FlowBuilder::new();
3347 let node = flow.process::<()>();
3348
3349 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3350
3351 let tick = node.tick();
3352 let batch = input.batch(&tick, nondet!(/** test */));
3353 let out_recv = batch.all_ticks().sim_output();
3354
3355 let instance_count = flow.sim().exhaustive(async || {
3356 in_send.send_many_unordered([1, 2, 3, 4]);
3357 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3358 });
3359
3360 assert_eq!(
3361 instance_count,
3362 75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3363 )
3364 }
3365
3366 #[cfg(feature = "sim")]
3367 #[test]
3368 #[should_panic]
3369 fn sim_observe_order_batched() {
3370 let mut flow = FlowBuilder::new();
3371 let node = flow.process::<()>();
3372
3373 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3374
3375 let tick = node.tick();
3376 let batch = input.batch(&tick, nondet!(/** test */));
3377 let out_recv = batch
3378 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3379 .all_ticks()
3380 .sim_output();
3381
3382 flow.sim().exhaustive(async || {
3383 in_send.send_many_unordered([1, 2, 3, 4]);
3384 out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3385 });
3386 }
3387
3388 #[cfg(feature = "sim")]
3389 #[test]
3390 fn sim_observe_order_batched_count() {
3391 let mut flow = FlowBuilder::new();
3392 let node = flow.process::<()>();
3393
3394 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3395
3396 let tick = node.tick();
3397 let batch = input.batch(&tick, nondet!(/** test */));
3398 let out_recv = batch
3399 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3400 .all_ticks()
3401 .sim_output();
3402
3403 let instance_count = flow.sim().exhaustive(async || {
3404 in_send.send_many_unordered([1, 2, 3, 4]);
3405 let _ = out_recv.collect::<Vec<_>>().await;
3406 });
3407
3408 assert_eq!(
3409 instance_count,
3410 192 // 4! * 2^{4 - 1}
3411 )
3412 }
3413
3414 #[cfg(feature = "sim")]
3415 #[test]
3416 fn sim_unordered_count_instance_count() {
3417 let mut flow = FlowBuilder::new();
3418 let node = flow.process::<()>();
3419
3420 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3421
3422 let tick = node.tick();
3423 let out_recv = input
3424 .count()
3425 .snapshot(&tick, nondet!(/** test */))
3426 .all_ticks()
3427 .sim_output();
3428
3429 let instance_count = flow.sim().exhaustive(async || {
3430 in_send.send_many_unordered([1, 2, 3, 4]);
3431 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3432 });
3433
3434 assert_eq!(
3435 instance_count,
3436 16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3437 )
3438 }
3439
3440 #[cfg(feature = "sim")]
3441 #[test]
3442 fn sim_top_level_assume_ordering() {
3443 let mut flow = FlowBuilder::new();
3444 let node = flow.process::<()>();
3445
3446 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3447
3448 let out_recv = input
3449 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3450 .sim_output();
3451
3452 let instance_count = flow.sim().exhaustive(async || {
3453 in_send.send_many_unordered([1, 2, 3]);
3454 let mut out = out_recv.collect::<Vec<_>>().await;
3455 out.sort();
3456 assert_eq!(out, vec![1, 2, 3]);
3457 });
3458
3459 assert_eq!(instance_count, 6)
3460 }
3461
3462 #[cfg(feature = "sim")]
3463 #[test]
3464 fn sim_top_level_assume_ordering_cycle_back() {
3465 let mut flow = FlowBuilder::new();
3466 let node = flow.process::<()>();
3467
3468 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3469
3470 let (complete_cycle_back, cycle_back) =
3471 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3472 let ordered = input
3473 .merge_unordered(cycle_back)
3474 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3475 complete_cycle_back.complete(
3476 ordered
3477 .clone()
3478 .map(q!(|v| v + 1))
3479 .filter(q!(|v| v % 2 == 1)),
3480 );
3481
3482 let out_recv = ordered.sim_output();
3483
3484 let mut saw = false;
3485 let instance_count = flow.sim().exhaustive(async || {
3486 in_send.send_many_unordered([0, 2]);
3487 let out = out_recv.collect::<Vec<_>>().await;
3488
3489 if out.starts_with(&[0, 1, 2]) {
3490 saw = true;
3491 }
3492 });
3493
3494 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3495 assert_eq!(instance_count, 6)
3496 }
3497
3498 #[cfg(feature = "sim")]
3499 #[test]
3500 fn sim_top_level_assume_ordering_cycle_back_tick() {
3501 let mut flow = FlowBuilder::new();
3502 let node = flow.process::<()>();
3503
3504 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3505
3506 let (complete_cycle_back, cycle_back) =
3507 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3508 let ordered = input
3509 .merge_unordered(cycle_back)
3510 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3511 complete_cycle_back.complete(
3512 ordered
3513 .clone()
3514 .batch(&node.tick(), nondet!(/** test */))
3515 .all_ticks()
3516 .map(q!(|v| v + 1))
3517 .filter(q!(|v| v % 2 == 1)),
3518 );
3519
3520 let out_recv = ordered.sim_output();
3521
3522 let mut saw = false;
3523 let instance_count = flow.sim().exhaustive(async || {
3524 in_send.send_many_unordered([0, 2]);
3525 let out = out_recv.collect::<Vec<_>>().await;
3526
3527 if out.starts_with(&[0, 1, 2]) {
3528 saw = true;
3529 }
3530 });
3531
3532 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3533 assert_eq!(instance_count, 58)
3534 }
3535
3536 #[cfg(feature = "sim")]
3537 #[test]
3538 fn sim_top_level_assume_ordering_multiple() {
3539 let mut flow = FlowBuilder::new();
3540 let node = flow.process::<()>();
3541
3542 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3543 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3544
3545 let (complete_cycle_back, cycle_back) =
3546 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3547 let input1_ordered = input
3548 .clone()
3549 .merge_unordered(cycle_back)
3550 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3551 let foo = input1_ordered
3552 .clone()
3553 .map(q!(|v| v + 3))
3554 .weaken_ordering::<NoOrder>()
3555 .merge_unordered(input2)
3556 .assume_ordering::<TotalOrder>(nondet!(/** test */));
3557
3558 complete_cycle_back.complete(foo.filter(q!(|v| *v == 3)));
3559
3560 let out_recv = input1_ordered.sim_output();
3561
3562 let mut saw = false;
3563 let instance_count = flow.sim().exhaustive(async || {
3564 in_send.send_many_unordered([0, 1]);
3565 let out = out_recv.collect::<Vec<_>>().await;
3566
3567 if out.starts_with(&[0, 3, 1]) {
3568 saw = true;
3569 }
3570 });
3571
3572 assert!(saw, "did not see an instance with 0, 3, 1 in order");
3573 assert_eq!(instance_count, 24)
3574 }
3575
3576 #[cfg(feature = "sim")]
3577 #[test]
3578 fn sim_atomic_assume_ordering_cycle_back() {
3579 let mut flow = FlowBuilder::new();
3580 let node = flow.process::<()>();
3581
3582 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3583
3584 let (complete_cycle_back, cycle_back) =
3585 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3586 let ordered = input
3587 .merge_unordered(cycle_back)
3588 .atomic()
3589 .assume_ordering::<TotalOrder>(nondet!(/** test */))
3590 .end_atomic();
3591 complete_cycle_back.complete(
3592 ordered
3593 .clone()
3594 .map(q!(|v| v + 1))
3595 .filter(q!(|v| v % 2 == 1)),
3596 );
3597
3598 let out_recv = ordered.sim_output();
3599
3600 let instance_count = flow.sim().exhaustive(async || {
3601 in_send.send_many_unordered([0, 2]);
3602 let out = out_recv.collect::<Vec<_>>().await;
3603 assert_eq!(out.len(), 4);
3604 });
3605
3606 assert_eq!(instance_count, 22)
3607 }
3608
3609 #[cfg(feature = "deploy")]
3610 #[tokio::test]
3611 async fn partition_evens_odds() {
3612 let mut deployment = Deployment::new();
3613
3614 let mut flow = FlowBuilder::new();
3615 let node = flow.process::<()>();
3616 let external = flow.external::<()>();
3617
3618 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
3619 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
3620 let evens_port = evens.send_bincode_external(&external);
3621 let odds_port = odds.send_bincode_external(&external);
3622
3623 let nodes = flow
3624 .with_process(&node, deployment.Localhost())
3625 .with_external(&external, deployment.Localhost())
3626 .deploy(&mut deployment);
3627
3628 deployment.deploy().await.unwrap();
3629
3630 let mut evens_out = nodes.connect(evens_port).await;
3631 let mut odds_out = nodes.connect(odds_port).await;
3632
3633 deployment.start().await.unwrap();
3634
3635 let mut even_results = Vec::new();
3636 for _ in 0..3 {
3637 even_results.push(evens_out.next().await.unwrap());
3638 }
3639 even_results.sort();
3640 assert_eq!(even_results, vec![2, 4, 6]);
3641
3642 let mut odd_results = Vec::new();
3643 for _ in 0..3 {
3644 odd_results.push(odds_out.next().await.unwrap());
3645 }
3646 odd_results.sort();
3647 assert_eq!(odd_results, vec![1, 3, 5]);
3648 }
3649
3650 #[cfg(feature = "deploy")]
3651 #[tokio::test]
3652 async fn unconsumed_inspect_still_runs() {
3653 use crate::deploy::DeployCrateWrapper;
3654
3655 let mut deployment = Deployment::new();
3656
3657 let mut flow = FlowBuilder::new();
3658 let node = flow.process::<()>();
3659
3660 // The return value of .inspect() is intentionally dropped.
3661 // Before the Null-root fix, this would silently do nothing.
3662 node.source_iter(q!(0..5))
3663 .inspect(q!(|x| println!("inspect: {}", x)));
3664
3665 let nodes = flow
3666 .with_process(&node, deployment.Localhost())
3667 .deploy(&mut deployment);
3668
3669 deployment.deploy().await.unwrap();
3670
3671 let mut stdout = nodes.get_process(&node).stdout();
3672
3673 deployment.start().await.unwrap();
3674
3675 let mut lines = Vec::new();
3676 for _ in 0..5 {
3677 lines.push(stdout.recv().await.unwrap());
3678 }
3679 lines.sort();
3680 assert_eq!(
3681 lines,
3682 vec![
3683 "inspect: 0",
3684 "inspect: 1",
3685 "inspect: 2",
3686 "inspect: 3",
3687 "inspect: 4",
3688 ]
3689 );
3690 }
3691}