hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::{DynLocation, LocationId};
21use crate::location::tick::{Atomic, DeferTick, NoAtomic};
22use crate::location::{Location, NoTick, Tick, check_matching_location};
23use crate::nondet::{NonDet, nondet};
24
25/// A *nullable* Rust value that can asynchronously change over time.
26///
27/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
28/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
29/// asynchronously change over time, including becoming present of uninhabited.
30///
31/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
32/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
33///
34/// Type Parameters:
35/// - `Type`: the type of the value in this optional (when it is not null)
36/// - `Loc`: the [`Location`] where the optional is materialized
37/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
38pub struct Optional<Type, Loc, Bound: Boundedness> {
39 pub(crate) location: Loc,
40 pub(crate) ir_node: RefCell<HydroNode>,
41 pub(crate) flow_state: FlowState,
42
43 _phantom: PhantomData<(Type, Loc, Bound)>,
44}
45
46impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
47 fn drop(&mut self) {
48 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
49 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
50 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
51 input: Box::new(ir_node),
52 op_metadata: HydroIrOpMetadata::new(),
53 });
54 }
55 }
56}
57
58impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
59where
60 T: Clone,
61 L: Location<'a> + NoTick,
62{
63 fn from(value: Optional<T, L, Bounded>) -> Self {
64 let tick = value.location().tick();
65 value.clone_into_tick(&tick).latest()
66 }
67}
68
69impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
70where
71 L: Location<'a>,
72{
73 fn defer_tick(self) -> Self {
74 Optional::defer_tick(self)
75 }
76}
77
78impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
79where
80 L: Location<'a>,
81{
82 type Location = Tick<L>;
83
84 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
85 Optional::new(
86 location.clone(),
87 HydroNode::CycleSource {
88 cycle_id,
89 metadata: location.new_node_metadata(Self::collection_kind()),
90 },
91 )
92 }
93}
94
95impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
96where
97 L: Location<'a>,
98{
99 type Location = Tick<L>;
100
101 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
102 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
103 location.clone(),
104 HydroNode::DeferTick {
105 input: Box::new(HydroNode::CycleSource {
106 cycle_id,
107 metadata: location.new_node_metadata(Self::collection_kind()),
108 }),
109 metadata: location
110 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
111 },
112 );
113
114 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
115 }
116}
117
118impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
119where
120 L: Location<'a>,
121{
122 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
123 assert_eq!(
124 Location::id(&self.location),
125 expected_location,
126 "locations do not match"
127 );
128 self.location
129 .flow_state()
130 .borrow_mut()
131 .push_root(HydroRoot::CycleSink {
132 cycle_id,
133 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
134 op_metadata: HydroIrOpMetadata::new(),
135 });
136 }
137}
138
139impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
140where
141 L: Location<'a>,
142{
143 type Location = Tick<L>;
144
145 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
146 Optional::new(
147 location.clone(),
148 HydroNode::CycleSource {
149 cycle_id,
150 metadata: location.new_node_metadata(Self::collection_kind()),
151 },
152 )
153 }
154}
155
156impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
157where
158 L: Location<'a>,
159{
160 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
161 assert_eq!(
162 Location::id(&self.location),
163 expected_location,
164 "locations do not match"
165 );
166 self.location
167 .flow_state()
168 .borrow_mut()
169 .push_root(HydroRoot::CycleSink {
170 cycle_id,
171 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
172 op_metadata: HydroIrOpMetadata::new(),
173 });
174 }
175}
176
177impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
178where
179 L: Location<'a> + NoTick,
180{
181 type Location = L;
182
183 fn create_source(cycle_id: CycleId, location: L) -> Self {
184 Optional::new(
185 location.clone(),
186 HydroNode::CycleSource {
187 cycle_id,
188 metadata: location.new_node_metadata(Self::collection_kind()),
189 },
190 )
191 }
192}
193
194impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
195where
196 L: Location<'a> + NoTick,
197{
198 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
199 assert_eq!(
200 Location::id(&self.location),
201 expected_location,
202 "locations do not match"
203 );
204 self.location
205 .flow_state()
206 .borrow_mut()
207 .push_root(HydroRoot::CycleSink {
208 cycle_id,
209 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
210 op_metadata: HydroIrOpMetadata::new(),
211 });
212 }
213}
214
215impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
216where
217 L: Location<'a>,
218{
219 fn from(singleton: Singleton<T, L, B>) -> Self {
220 Optional::new(
221 singleton.location.clone(),
222 HydroNode::Cast {
223 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
224 metadata: singleton
225 .location
226 .new_node_metadata(Self::collection_kind()),
227 },
228 )
229 }
230}
231
232#[cfg(stageleft_runtime)]
233pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
234 me: Optional<T, L, B>,
235 other: Optional<O, L, B>,
236) -> Optional<(T, O), L, B> {
237 check_matching_location(&me.location, &other.location);
238
239 Optional::new(
240 me.location.clone(),
241 HydroNode::CrossSingleton {
242 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
243 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
244 metadata: me
245 .location
246 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
247 },
248 )
249}
250
251#[cfg(stageleft_runtime)]
252fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
253 me: Optional<T, L, B>,
254 other: Optional<T, L, B>,
255) -> Optional<T, L, B> {
256 check_matching_location(&me.location, &other.location);
257
258 Optional::new(
259 me.location.clone(),
260 HydroNode::ChainFirst {
261 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
262 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
263 metadata: me
264 .location
265 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
266 },
267 )
268}
269
270impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
271where
272 T: Clone,
273 L: Location<'a>,
274{
275 fn clone(&self) -> Self {
276 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
277 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
278 *self.ir_node.borrow_mut() = HydroNode::Tee {
279 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
280 metadata: self.location.new_node_metadata(Self::collection_kind()),
281 };
282 }
283
284 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
285 Optional {
286 location: self.location.clone(),
287 flow_state: self.flow_state.clone(),
288 ir_node: HydroNode::Tee {
289 inner: SharedNode(inner.0.clone()),
290 metadata: metadata.clone(),
291 }
292 .into(),
293 _phantom: PhantomData,
294 }
295 } else {
296 unreachable!()
297 }
298 }
299}
300
301impl<'a, T, L, B: Boundedness> Optional<T, L, B>
302where
303 L: Location<'a>,
304{
305 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308 let flow_state = location.flow_state().clone();
309 Optional {
310 location,
311 flow_state,
312 ir_node: RefCell::new(ir_node),
313 _phantom: PhantomData,
314 }
315 }
316
317 pub(crate) fn collection_kind() -> CollectionKind {
318 CollectionKind::Optional {
319 bound: B::BOUND_KIND,
320 element_type: stageleft::quote_type::<T>().into(),
321 }
322 }
323
324 /// Returns the [`Location`] where this optional is being materialized.
325 pub fn location(&self) -> &L {
326 &self.location
327 }
328
329 /// Transforms the optional value by applying a function `f` to it,
330 /// continuously as the input is updated.
331 ///
332 /// Whenever the optional is empty, the output optional is also empty.
333 ///
334 /// # Example
335 /// ```rust
336 /// # #[cfg(feature = "deploy")] {
337 /// # use hydro_lang::prelude::*;
338 /// # use futures::StreamExt;
339 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
340 /// let tick = process.tick();
341 /// let optional = tick.optional_first_tick(q!(1));
342 /// optional.map(q!(|v| v + 1)).all_ticks()
343 /// # }, |mut stream| async move {
344 /// // 2
345 /// # assert_eq!(stream.next().await.unwrap(), 2);
346 /// # }));
347 /// # }
348 /// ```
349 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
350 where
351 F: Fn(T) -> U + 'a,
352 {
353 let f = f.splice_fn1_ctx(&self.location).into();
354 Optional::new(
355 self.location.clone(),
356 HydroNode::Map {
357 f,
358 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
359 metadata: self
360 .location
361 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
362 },
363 )
364 }
365
366 /// Transforms the optional value by applying a function `f` to it and then flattening
367 /// the result into a stream, preserving the order of elements.
368 ///
369 /// If the optional is empty, the output stream is also empty. If the optional contains
370 /// a value, `f` is applied to produce an iterator, and all items from that iterator
371 /// are emitted in the output stream in deterministic order.
372 ///
373 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
374 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
375 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
376 ///
377 /// # Example
378 /// ```rust
379 /// # #[cfg(feature = "deploy")] {
380 /// # use hydro_lang::prelude::*;
381 /// # use futures::StreamExt;
382 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
383 /// let tick = process.tick();
384 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
385 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
386 /// # }, |mut stream| async move {
387 /// // 1, 2, 3
388 /// # for w in vec![1, 2, 3] {
389 /// # assert_eq!(stream.next().await.unwrap(), w);
390 /// # }
391 /// # }));
392 /// # }
393 /// ```
394 pub fn flat_map_ordered<U, I, F>(
395 self,
396 f: impl IntoQuotedMut<'a, F, L>,
397 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
398 where
399 B: IsBounded,
400 I: IntoIterator<Item = U>,
401 F: Fn(T) -> I + 'a,
402 {
403 self.into_stream().flat_map_ordered(f)
404 }
405
406 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
407 /// for the output type `I` to produce items in any order.
408 ///
409 /// If the optional is empty, the output stream is also empty. If the optional contains
410 /// a value, `f` is applied to produce an iterator, and all items from that iterator
411 /// are emitted in the output stream in non-deterministic order.
412 ///
413 /// # Example
414 /// ```rust
415 /// # #[cfg(feature = "deploy")] {
416 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
417 /// # use futures::StreamExt;
418 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
419 /// let tick = process.tick();
420 /// let optional = tick.optional_first_tick(q!(
421 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
422 /// ));
423 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
424 /// # }, |mut stream| async move {
425 /// // 1, 2, 3, but in no particular order
426 /// # let mut results = Vec::new();
427 /// # for _ in 0..3 {
428 /// # results.push(stream.next().await.unwrap());
429 /// # }
430 /// # results.sort();
431 /// # assert_eq!(results, vec![1, 2, 3]);
432 /// # }));
433 /// # }
434 /// ```
435 pub fn flat_map_unordered<U, I, F>(
436 self,
437 f: impl IntoQuotedMut<'a, F, L>,
438 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
439 where
440 B: IsBounded,
441 I: IntoIterator<Item = U>,
442 F: Fn(T) -> I + 'a,
443 {
444 self.into_stream().flat_map_unordered(f)
445 }
446
447 /// Flattens the optional value into a stream, preserving the order of elements.
448 ///
449 /// If the optional is empty, the output stream is also empty. If the optional contains
450 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
451 /// in the output stream in deterministic order.
452 ///
453 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
454 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
455 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
456 ///
457 /// # Example
458 /// ```rust
459 /// # #[cfg(feature = "deploy")] {
460 /// # use hydro_lang::prelude::*;
461 /// # use futures::StreamExt;
462 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
463 /// let tick = process.tick();
464 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
465 /// optional.flatten_ordered().all_ticks()
466 /// # }, |mut stream| async move {
467 /// // 1, 2, 3
468 /// # for w in vec![1, 2, 3] {
469 /// # assert_eq!(stream.next().await.unwrap(), w);
470 /// # }
471 /// # }));
472 /// # }
473 /// ```
474 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
475 where
476 B: IsBounded,
477 T: IntoIterator<Item = U>,
478 {
479 self.flat_map_ordered(q!(|v| v))
480 }
481
482 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
483 /// for the element type `T` to produce items in any order.
484 ///
485 /// If the optional is empty, the output stream is also empty. If the optional contains
486 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
487 /// in the output stream in non-deterministic order.
488 ///
489 /// # Example
490 /// ```rust
491 /// # #[cfg(feature = "deploy")] {
492 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
493 /// # use futures::StreamExt;
494 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
495 /// let tick = process.tick();
496 /// let optional = tick.optional_first_tick(q!(
497 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
498 /// ));
499 /// optional.flatten_unordered().all_ticks()
500 /// # }, |mut stream| async move {
501 /// // 1, 2, 3, but in no particular order
502 /// # let mut results = Vec::new();
503 /// # for _ in 0..3 {
504 /// # results.push(stream.next().await.unwrap());
505 /// # }
506 /// # results.sort();
507 /// # assert_eq!(results, vec![1, 2, 3]);
508 /// # }));
509 /// # }
510 /// ```
511 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
512 where
513 B: IsBounded,
514 T: IntoIterator<Item = U>,
515 {
516 self.flat_map_unordered(q!(|v| v))
517 }
518
519 /// Creates an optional containing only the value if it satisfies a predicate `f`.
520 ///
521 /// If the optional is empty, the output optional is also empty. If the optional contains
522 /// a value and the predicate returns `true`, the output optional contains the same value.
523 /// If the predicate returns `false`, the output optional is empty.
524 ///
525 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
526 /// not modify or take ownership of the value. If you need to modify the value while filtering
527 /// use [`Optional::filter_map`] instead.
528 ///
529 /// # Example
530 /// ```rust
531 /// # #[cfg(feature = "deploy")] {
532 /// # use hydro_lang::prelude::*;
533 /// # use futures::StreamExt;
534 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
535 /// let tick = process.tick();
536 /// let optional = tick.optional_first_tick(q!(5));
537 /// optional.filter(q!(|&x| x > 3)).all_ticks()
538 /// # }, |mut stream| async move {
539 /// // 5
540 /// # assert_eq!(stream.next().await.unwrap(), 5);
541 /// # }));
542 /// # }
543 /// ```
544 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
545 where
546 F: Fn(&T) -> bool + 'a,
547 {
548 let f = f.splice_fn1_borrow_ctx(&self.location).into();
549 Optional::new(
550 self.location.clone(),
551 HydroNode::Filter {
552 f,
553 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
554 metadata: self.location.new_node_metadata(Self::collection_kind()),
555 },
556 )
557 }
558
559 /// An operator that both filters and maps. It yields only the value if the supplied
560 /// closure `f` returns `Some(value)`.
561 ///
562 /// If the optional is empty, the output optional is also empty. If the optional contains
563 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
564 /// If the closure returns `None`, the output optional is empty.
565 ///
566 /// # Example
567 /// ```rust
568 /// # #[cfg(feature = "deploy")] {
569 /// # use hydro_lang::prelude::*;
570 /// # use futures::StreamExt;
571 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
572 /// let tick = process.tick();
573 /// let optional = tick.optional_first_tick(q!("42"));
574 /// optional
575 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
576 /// .all_ticks()
577 /// # }, |mut stream| async move {
578 /// // 42
579 /// # assert_eq!(stream.next().await.unwrap(), 42);
580 /// # }));
581 /// # }
582 /// ```
583 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
584 where
585 F: Fn(T) -> Option<U> + 'a,
586 {
587 let f = f.splice_fn1_ctx(&self.location).into();
588 Optional::new(
589 self.location.clone(),
590 HydroNode::FilterMap {
591 f,
592 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593 metadata: self
594 .location
595 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
596 },
597 )
598 }
599
600 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
601 ///
602 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
603 /// non-null. This is useful for combining several pieces of state together.
604 ///
605 /// # Example
606 /// ```rust
607 /// # #[cfg(feature = "deploy")] {
608 /// # use hydro_lang::prelude::*;
609 /// # use futures::StreamExt;
610 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
611 /// let tick = process.tick();
612 /// let numbers = process
613 /// .source_iter(q!(vec![123, 456, 789]))
614 /// .batch(&tick, nondet!(/** test */));
615 /// let min = numbers.clone().min(); // Optional
616 /// let max = numbers.max(); // Optional
617 /// min.zip(max).all_ticks()
618 /// # }, |mut stream| async move {
619 /// // [(123, 789)]
620 /// # for w in vec![(123, 789)] {
621 /// # assert_eq!(stream.next().await.unwrap(), w);
622 /// # }
623 /// # }));
624 /// # }
625 /// ```
626 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
627 where
628 B: IsBounded,
629 {
630 let other: Optional<O, L, B> = other.into();
631 check_matching_location(&self.location, &other.location);
632
633 if L::is_top_level()
634 && let Some(tick) = self.location.try_tick()
635 {
636 let out = zip_inside_tick(
637 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
638 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
639 )
640 .latest();
641
642 Optional::new(
643 out.location.clone(),
644 out.ir_node.replace(HydroNode::Placeholder),
645 )
646 } else {
647 zip_inside_tick(self, other)
648 }
649 }
650
651 /// Passes through `self` when it has a value, otherwise passes through `other`.
652 ///
653 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
654 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
655 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
656 ///
657 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
658 /// of the inputs change (including to/from null states).
659 ///
660 /// # Example
661 /// ```rust
662 /// # #[cfg(feature = "deploy")] {
663 /// # use hydro_lang::prelude::*;
664 /// # use futures::StreamExt;
665 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
666 /// let tick = process.tick();
667 /// // ticks are lazy by default, forces the second tick to run
668 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
669 ///
670 /// let some_first_tick = tick.optional_first_tick(q!(123));
671 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
672 /// some_first_tick.or(some_second_tick).all_ticks()
673 /// # }, |mut stream| async move {
674 /// // [123 /* first tick */, 456 /* second tick */]
675 /// # for w in vec![123, 456] {
676 /// # assert_eq!(stream.next().await.unwrap(), w);
677 /// # }
678 /// # }));
679 /// # }
680 /// ```
681 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
682 check_matching_location(&self.location, &other.location);
683
684 if L::is_top_level()
685 && !B::BOUNDED // only if unbounded we need to use a tick
686 && let Some(tick) = self.location.try_tick()
687 {
688 let out = or_inside_tick(
689 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
690 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
691 )
692 .latest();
693
694 Optional::new(
695 out.location.clone(),
696 out.ir_node.replace(HydroNode::Placeholder),
697 )
698 } else {
699 Optional::new(
700 self.location.clone(),
701 HydroNode::ChainFirst {
702 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
703 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
704 metadata: self.location.new_node_metadata(Self::collection_kind()),
705 },
706 )
707 }
708 }
709
710 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
711 ///
712 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
713 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
714 ///
715 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
716 /// of the inputs change (including to/from null states).
717 ///
718 /// # Example
719 /// ```rust
720 /// # #[cfg(feature = "deploy")] {
721 /// # use hydro_lang::prelude::*;
722 /// # use futures::StreamExt;
723 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
724 /// let tick = process.tick();
725 /// // ticks are lazy by default, forces the later ticks to run
726 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
727 ///
728 /// let some_first_tick = tick.optional_first_tick(q!(123));
729 /// some_first_tick
730 /// .unwrap_or(tick.singleton(q!(456)))
731 /// .all_ticks()
732 /// # }, |mut stream| async move {
733 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
734 /// # for w in vec![123, 456, 456, 456] {
735 /// # assert_eq!(stream.next().await.unwrap(), w);
736 /// # }
737 /// # }));
738 /// # }
739 /// ```
740 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
741 let res_option = self.or(other.into());
742 Singleton::new(
743 res_option.location.clone(),
744 HydroNode::Cast {
745 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
746 metadata: res_option
747 .location
748 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
749 },
750 )
751 }
752
753 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
754 ///
755 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
756 /// [`Optional`] when the default value of the type is a suitable fallback.
757 ///
758 /// # Example
759 /// ```rust
760 /// # #[cfg(feature = "deploy")] {
761 /// # use hydro_lang::prelude::*;
762 /// # use futures::StreamExt;
763 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
764 /// let tick = process.tick();
765 /// // ticks are lazy by default, forces the later ticks to run
766 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
767 ///
768 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
769 /// some_first_tick.unwrap_or_default().all_ticks()
770 /// # }, |mut stream| async move {
771 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
772 /// # for w in vec![123, 0, 0, 0] {
773 /// # assert_eq!(stream.next().await.unwrap(), w);
774 /// # }
775 /// # }));
776 /// # }
777 /// ```
778 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
779 where
780 T: Default + Clone,
781 {
782 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
783 }
784
785 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
786 ///
787 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
788 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
789 /// so that Hydro can skip any computation on null values.
790 ///
791 /// # Example
792 /// ```rust
793 /// # #[cfg(feature = "deploy")] {
794 /// # use hydro_lang::prelude::*;
795 /// # use futures::StreamExt;
796 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
797 /// let tick = process.tick();
798 /// // ticks are lazy by default, forces the later ticks to run
799 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
800 ///
801 /// let some_first_tick = tick.optional_first_tick(q!(123));
802 /// some_first_tick.into_singleton().all_ticks()
803 /// # }, |mut stream| async move {
804 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
805 /// # for w in vec![Some(123), None, None, None] {
806 /// # assert_eq!(stream.next().await.unwrap(), w);
807 /// # }
808 /// # }));
809 /// # }
810 /// ```
811 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
812 where
813 T: Clone,
814 {
815 let none: syn::Expr = parse_quote!(::std::option::Option::None);
816
817 let none_singleton = Singleton::new(
818 self.location.clone(),
819 HydroNode::SingletonSource {
820 value: none.into(),
821 first_tick_only: false,
822 metadata: self
823 .location
824 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
825 },
826 );
827
828 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
829 }
830
831 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
832 ///
833 /// # Example
834 /// ```rust
835 /// # #[cfg(feature = "deploy")] {
836 /// # use hydro_lang::prelude::*;
837 /// # use futures::StreamExt;
838 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
839 /// let tick = process.tick();
840 /// // ticks are lazy by default, forces the second tick to run
841 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
842 ///
843 /// let some_first_tick = tick.optional_first_tick(q!(42));
844 /// some_first_tick.is_some().all_ticks()
845 /// # }, |mut stream| async move {
846 /// // [true /* first tick */, false /* second tick */, ...]
847 /// # for w in vec![true, false] {
848 /// # assert_eq!(stream.next().await.unwrap(), w);
849 /// # }
850 /// # }));
851 /// # }
852 /// ```
853 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
854 pub fn is_some(self) -> Singleton<bool, L, B> {
855 self.map(q!(|_| ()))
856 .into_singleton()
857 .map(q!(|o| o.is_some()))
858 }
859
860 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
861 ///
862 /// # Example
863 /// ```rust
864 /// # #[cfg(feature = "deploy")] {
865 /// # use hydro_lang::prelude::*;
866 /// # use futures::StreamExt;
867 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
868 /// let tick = process.tick();
869 /// // ticks are lazy by default, forces the second tick to run
870 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
871 ///
872 /// let some_first_tick = tick.optional_first_tick(q!(42));
873 /// some_first_tick.is_none().all_ticks()
874 /// # }, |mut stream| async move {
875 /// // [false /* first tick */, true /* second tick */, ...]
876 /// # for w in vec![false, true] {
877 /// # assert_eq!(stream.next().await.unwrap(), w);
878 /// # }
879 /// # }));
880 /// # }
881 /// ```
882 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
883 pub fn is_none(self) -> Singleton<bool, L, B> {
884 self.map(q!(|_| ()))
885 .into_singleton()
886 .map(q!(|o| o.is_none()))
887 }
888
889 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
890 /// values are equal, `false` otherwise (including when either is null).
891 ///
892 /// # Example
893 /// ```rust
894 /// # #[cfg(feature = "deploy")] {
895 /// # use hydro_lang::prelude::*;
896 /// # use futures::StreamExt;
897 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
898 /// let tick = process.tick();
899 /// // ticks are lazy by default, forces the second tick to run
900 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
901 ///
902 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
903 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
904 /// a.is_some_and_equals(b).all_ticks()
905 /// # }, |mut stream| async move {
906 /// // [true, false]
907 /// # for w in vec![true, false] {
908 /// # assert_eq!(stream.next().await.unwrap(), w);
909 /// # }
910 /// # }));
911 /// # }
912 /// ```
913 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
914 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
915 where
916 T: PartialEq + Clone,
917 B: IsBounded,
918 {
919 self.into_singleton()
920 .zip(other.into_singleton())
921 .map(q!(|(a, b)| a.is_some() && a == b))
922 }
923
924 /// An operator which allows you to "name" a `HydroNode`.
925 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
926 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
927 {
928 let mut node = self.ir_node.borrow_mut();
929 let metadata = node.metadata_mut();
930 metadata.tag = Some(name.to_owned());
931 }
932 self
933 }
934
935 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
936 /// implies that `B == Bounded`.
937 pub fn make_bounded(self) -> Optional<T, L, Bounded>
938 where
939 B: IsBounded,
940 {
941 Optional::new(
942 self.location.clone(),
943 self.ir_node.replace(HydroNode::Placeholder),
944 )
945 }
946
947 /// Clones this bounded optional into a tick, returning a optional that has the
948 /// same value as the outer optional. Because the outer optional is bounded, this
949 /// is deterministic because there is only a single immutable version.
950 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
951 where
952 B: IsBounded,
953 T: Clone,
954 {
955 // TODO(shadaj): avoid printing simulator logs for this snapshot
956 self.snapshot(
957 tick,
958 nondet!(/** bounded top-level optional so deterministic */),
959 )
960 }
961
962 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
963 /// non-null. Otherwise, the stream is empty.
964 ///
965 /// # Example
966 /// ```rust
967 /// # #[cfg(feature = "deploy")] {
968 /// # use hydro_lang::prelude::*;
969 /// # use futures::StreamExt;
970 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
971 /// # let tick = process.tick();
972 /// # // ticks are lazy by default, forces the second tick to run
973 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
974 /// # let batch_first_tick = process
975 /// # .source_iter(q!(vec![]))
976 /// # .batch(&tick, nondet!(/** test */));
977 /// # let batch_second_tick = process
978 /// # .source_iter(q!(vec![123, 456]))
979 /// # .batch(&tick, nondet!(/** test */))
980 /// # .defer_tick(); // appears on the second tick
981 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
982 /// input_batch // first tick: [], second tick: [123, 456]
983 /// .clone()
984 /// .max()
985 /// .into_stream()
986 /// .chain(input_batch)
987 /// .all_ticks()
988 /// # }, |mut stream| async move {
989 /// // [456, 123, 456]
990 /// # for w in vec![456, 123, 456] {
991 /// # assert_eq!(stream.next().await.unwrap(), w);
992 /// # }
993 /// # }));
994 /// # }
995 /// ```
996 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
997 where
998 B: IsBounded,
999 {
1000 Stream::new(
1001 self.location.clone(),
1002 HydroNode::Cast {
1003 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1004 metadata: self.location.new_node_metadata(Stream::<
1005 T,
1006 Tick<L>,
1007 Bounded,
1008 TotalOrder,
1009 ExactlyOnce,
1010 >::collection_kind()),
1011 },
1012 )
1013 }
1014
1015 /// Filters this optional, passing through the value if the boolean signal is `true`,
1016 /// otherwise the output is null.
1017 ///
1018 /// # Example
1019 /// ```rust
1020 /// # #[cfg(feature = "deploy")] {
1021 /// # use hydro_lang::prelude::*;
1022 /// # use futures::StreamExt;
1023 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1024 /// let tick = process.tick();
1025 /// // ticks are lazy by default, forces the second tick to run
1026 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1027 ///
1028 /// let some_first_tick = tick.optional_first_tick(q!(()));
1029 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1030 /// let batch_first_tick = process
1031 /// .source_iter(q!(vec![456]))
1032 /// .batch(&tick, nondet!(/** test */));
1033 /// let batch_second_tick = process
1034 /// .source_iter(q!(vec![789]))
1035 /// .batch(&tick, nondet!(/** test */))
1036 /// .defer_tick();
1037 /// batch_first_tick.chain(batch_second_tick).first()
1038 /// .filter_if(signal)
1039 /// .unwrap_or(tick.singleton(q!(0)))
1040 /// .all_ticks()
1041 /// # }, |mut stream| async move {
1042 /// // [456, 0]
1043 /// # for w in vec![456, 0] {
1044 /// # assert_eq!(stream.next().await.unwrap(), w);
1045 /// # }
1046 /// # }));
1047 /// # }
1048 /// ```
1049 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1050 where
1051 B: IsBounded,
1052 {
1053 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1054 }
1055
1056 /// Filters this optional, passing through the optional value if it is non-null **and** the
1057 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1058 ///
1059 /// Useful for conditionally processing, such as only emitting an optional's value outside
1060 /// a tick if some other condition is satisfied.
1061 ///
1062 /// # Example
1063 /// ```rust
1064 /// # #[cfg(feature = "deploy")] {
1065 /// # use hydro_lang::prelude::*;
1066 /// # use futures::StreamExt;
1067 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1068 /// let tick = process.tick();
1069 /// // ticks are lazy by default, forces the second tick to run
1070 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1071 ///
1072 /// let batch_first_tick = process
1073 /// .source_iter(q!(vec![]))
1074 /// .batch(&tick, nondet!(/** test */));
1075 /// let batch_second_tick = process
1076 /// .source_iter(q!(vec![456]))
1077 /// .batch(&tick, nondet!(/** test */))
1078 /// .defer_tick(); // appears on the second tick
1079 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1080 /// batch_first_tick.chain(batch_second_tick).first()
1081 /// .filter_if_some(some_on_first_tick)
1082 /// .unwrap_or(tick.singleton(q!(789)))
1083 /// .all_ticks()
1084 /// # }, |mut stream| async move {
1085 /// // [789, 789]
1086 /// # for w in vec![789, 789] {
1087 /// # assert_eq!(stream.next().await.unwrap(), w);
1088 /// # }
1089 /// # }));
1090 /// # }
1091 /// ```
1092 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1093 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1094 where
1095 B: IsBounded,
1096 {
1097 self.filter_if(signal.is_some())
1098 }
1099
1100 /// Filters this optional, passing through the optional value if it is non-null **and** the
1101 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1102 ///
1103 /// Useful for conditionally processing, such as only emitting an optional's value outside
1104 /// a tick if some other condition is satisfied.
1105 ///
1106 /// # Example
1107 /// ```rust
1108 /// # #[cfg(feature = "deploy")] {
1109 /// # use hydro_lang::prelude::*;
1110 /// # use futures::StreamExt;
1111 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1112 /// let tick = process.tick();
1113 /// // ticks are lazy by default, forces the second tick to run
1114 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1115 ///
1116 /// let batch_first_tick = process
1117 /// .source_iter(q!(vec![]))
1118 /// .batch(&tick, nondet!(/** test */));
1119 /// let batch_second_tick = process
1120 /// .source_iter(q!(vec![456]))
1121 /// .batch(&tick, nondet!(/** test */))
1122 /// .defer_tick(); // appears on the second tick
1123 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1124 /// batch_first_tick.chain(batch_second_tick).first()
1125 /// .filter_if_none(some_on_first_tick)
1126 /// .unwrap_or(tick.singleton(q!(789)))
1127 /// .all_ticks()
1128 /// # }, |mut stream| async move {
1129 /// // [789, 789]
1130 /// # for w in vec![789, 456] {
1131 /// # assert_eq!(stream.next().await.unwrap(), w);
1132 /// # }
1133 /// # }));
1134 /// # }
1135 /// ```
1136 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1137 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1138 where
1139 B: IsBounded,
1140 {
1141 self.filter_if(other.is_none())
1142 }
1143
1144 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1145 ///
1146 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1147 /// having a value, such as only releasing a piece of state if the node is the leader.
1148 ///
1149 /// # Example
1150 /// ```rust
1151 /// # #[cfg(feature = "deploy")] {
1152 /// # use hydro_lang::prelude::*;
1153 /// # use futures::StreamExt;
1154 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1155 /// let tick = process.tick();
1156 /// // ticks are lazy by default, forces the second tick to run
1157 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1158 ///
1159 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1160 /// some_on_first_tick
1161 /// .if_some_then(tick.singleton(q!(456)))
1162 /// .unwrap_or(tick.singleton(q!(123)))
1163 /// # .all_ticks()
1164 /// # }, |mut stream| async move {
1165 /// // 456 (first tick) ~> 123 (second tick onwards)
1166 /// # for w in vec![456, 123, 123] {
1167 /// # assert_eq!(stream.next().await.unwrap(), w);
1168 /// # }
1169 /// # }));
1170 /// # }
1171 /// ```
1172 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1173 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1174 where
1175 B: IsBounded,
1176 {
1177 value.filter_if(self.is_some())
1178 }
1179}
1180
1181impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1182where
1183 L: Location<'a> + NoTick,
1184{
1185 /// Returns an optional value corresponding to the latest snapshot of the optional
1186 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1187 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1188 /// all snapshots of this optional into the atomic-associated tick will observe the
1189 /// same value each tick.
1190 ///
1191 /// # Non-Determinism
1192 /// Because this picks a snapshot of a optional whose value is continuously changing,
1193 /// the output optional has a non-deterministic value since the snapshot can be at an
1194 /// arbitrary point in time.
1195 pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1196 Optional::new(
1197 tick.clone(),
1198 HydroNode::Batch {
1199 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1200 metadata: tick
1201 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1202 },
1203 )
1204 }
1205
1206 /// Returns this optional back into a top-level, asynchronous execution context where updates
1207 /// to the value will be asynchronously propagated.
1208 pub fn end_atomic(self) -> Optional<T, L, B> {
1209 Optional::new(
1210 self.location.tick.l.clone(),
1211 HydroNode::EndAtomic {
1212 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1213 metadata: self
1214 .location
1215 .tick
1216 .l
1217 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1218 },
1219 )
1220 }
1221}
1222
1223impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1224where
1225 L: Location<'a>,
1226{
1227 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1228 /// will observe the same version of the value and will be executed synchronously before any
1229 /// outputs are yielded (in [`Optional::end_atomic`]).
1230 ///
1231 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1232 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1233 /// a different version).
1234 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1235 let id = self.location.flow_state().borrow_mut().next_clock_id();
1236 let out_location = Atomic {
1237 tick: Tick {
1238 id,
1239 l: self.location.clone(),
1240 },
1241 };
1242 Optional::new(
1243 out_location.clone(),
1244 HydroNode::BeginAtomic {
1245 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1246 metadata: out_location
1247 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1248 },
1249 )
1250 }
1251
1252 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1253 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1254 /// relevant data that contributed to the snapshot at tick `t`.
1255 ///
1256 /// # Non-Determinism
1257 /// Because this picks a snapshot of a optional whose value is continuously changing,
1258 /// the output optional has a non-deterministic value since the snapshot can be at an
1259 /// arbitrary point in time.
1260 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1261 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1262 Optional::new(
1263 tick.clone(),
1264 HydroNode::Batch {
1265 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1266 metadata: tick
1267 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1268 },
1269 )
1270 }
1271
1272 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1273 /// with order corresponding to increasing prefixes of data contributing to the optional.
1274 ///
1275 /// # Non-Determinism
1276 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1277 /// to non-deterministic batching and arrival of inputs, the output stream is
1278 /// non-deterministic.
1279 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1280 where
1281 L: NoTick,
1282 {
1283 let tick = self.location.tick();
1284 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1285 }
1286
1287 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1288 /// value taken at various points in time. Because the input optional may be
1289 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1290 /// represent the value of the optional given some prefix of the streams leading up to
1291 /// it.
1292 ///
1293 /// # Non-Determinism
1294 /// The output stream is non-deterministic in which elements are sampled, since this
1295 /// is controlled by a clock.
1296 pub fn sample_every(
1297 self,
1298 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1299 nondet: NonDet,
1300 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1301 where
1302 L: NoTick + NoAtomic,
1303 {
1304 let samples = self.location.source_interval(interval, nondet);
1305 let tick = self.location.tick();
1306
1307 self.snapshot(&tick, nondet)
1308 .filter_if(samples.batch(&tick, nondet).first().is_some())
1309 .all_ticks()
1310 .weaken_retries()
1311 }
1312}
1313
1314impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1315where
1316 L: Location<'a>,
1317{
1318 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1319 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1320 /// null values).
1321 ///
1322 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1323 /// producing one element in the output for each (non-null) tick. This is useful for batched
1324 /// computations, where the results from each tick must be combined together.
1325 ///
1326 /// # Example
1327 /// ```rust
1328 /// # #[cfg(feature = "deploy")] {
1329 /// # use hydro_lang::prelude::*;
1330 /// # use futures::StreamExt;
1331 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1332 /// # let tick = process.tick();
1333 /// # // ticks are lazy by default, forces the second tick to run
1334 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1335 /// # let batch_first_tick = process
1336 /// # .source_iter(q!(vec![]))
1337 /// # .batch(&tick, nondet!(/** test */));
1338 /// # let batch_second_tick = process
1339 /// # .source_iter(q!(vec![1, 2, 3]))
1340 /// # .batch(&tick, nondet!(/** test */))
1341 /// # .defer_tick(); // appears on the second tick
1342 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1343 /// input_batch // first tick: [], second tick: [1, 2, 3]
1344 /// .max()
1345 /// .all_ticks()
1346 /// # }, |mut stream| async move {
1347 /// // [3]
1348 /// # for w in vec![3] {
1349 /// # assert_eq!(stream.next().await.unwrap(), w);
1350 /// # }
1351 /// # }));
1352 /// # }
1353 /// ```
1354 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1355 self.into_stream().all_ticks()
1356 }
1357
1358 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1359 /// which will stream the value computed in _each_ tick as a separate stream element.
1360 ///
1361 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1362 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1363 /// optional's [`Tick`] context.
1364 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1365 self.into_stream().all_ticks_atomic()
1366 }
1367
1368 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1369 /// be asynchronously updated with the latest value of the optional inside the tick, including
1370 /// whether the optional is null or not.
1371 ///
1372 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1373 /// tick that tracks the inner value. This is useful for getting the value as of the
1374 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1375 ///
1376 /// # Example
1377 /// ```rust
1378 /// # #[cfg(feature = "deploy")] {
1379 /// # use hydro_lang::prelude::*;
1380 /// # use futures::StreamExt;
1381 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1382 /// # let tick = process.tick();
1383 /// # // ticks are lazy by default, forces the second tick to run
1384 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1385 /// # let batch_first_tick = process
1386 /// # .source_iter(q!(vec![]))
1387 /// # .batch(&tick, nondet!(/** test */));
1388 /// # let batch_second_tick = process
1389 /// # .source_iter(q!(vec![1, 2, 3]))
1390 /// # .batch(&tick, nondet!(/** test */))
1391 /// # .defer_tick(); // appears on the second tick
1392 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1393 /// input_batch // first tick: [], second tick: [1, 2, 3]
1394 /// .max()
1395 /// .latest()
1396 /// # .into_singleton()
1397 /// # .sample_eager(nondet!(/** test */))
1398 /// # }, |mut stream| async move {
1399 /// // asynchronously changes from None ~> 3
1400 /// # for w in vec![None, Some(3)] {
1401 /// # assert_eq!(stream.next().await.unwrap(), w);
1402 /// # }
1403 /// # }));
1404 /// # }
1405 /// ```
1406 pub fn latest(self) -> Optional<T, L, Unbounded> {
1407 Optional::new(
1408 self.location.outer().clone(),
1409 HydroNode::YieldConcat {
1410 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1411 metadata: self
1412 .location
1413 .outer()
1414 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1415 },
1416 )
1417 }
1418
1419 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1420 /// be updated with the latest value of the optional inside the tick.
1421 ///
1422 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1423 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1424 /// optional's [`Tick`] context.
1425 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1426 let out_location = Atomic {
1427 tick: self.location.clone(),
1428 };
1429
1430 Optional::new(
1431 out_location.clone(),
1432 HydroNode::YieldConcat {
1433 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1434 metadata: out_location
1435 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1436 },
1437 )
1438 }
1439
1440 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1441 /// always has the state of `self` at tick `T - 1`.
1442 ///
1443 /// At tick `0`, the output optional is null, since there is no previous tick.
1444 ///
1445 /// This operator enables stateful iterative processing with ticks, by sending data from one
1446 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1447 ///
1448 /// # Example
1449 /// ```rust
1450 /// # #[cfg(feature = "deploy")] {
1451 /// # use hydro_lang::prelude::*;
1452 /// # use futures::StreamExt;
1453 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1454 /// let tick = process.tick();
1455 /// // ticks are lazy by default, forces the second tick to run
1456 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1457 ///
1458 /// let batch_first_tick = process
1459 /// .source_iter(q!(vec![1, 2]))
1460 /// .batch(&tick, nondet!(/** test */));
1461 /// let batch_second_tick = process
1462 /// .source_iter(q!(vec![3, 4]))
1463 /// .batch(&tick, nondet!(/** test */))
1464 /// .defer_tick(); // appears on the second tick
1465 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1466 /// .reduce(q!(|state, v| *state += v));
1467 ///
1468 /// current_tick_sum.clone().into_singleton().zip(
1469 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1470 /// ).all_ticks()
1471 /// # }, |mut stream| async move {
1472 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1473 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1474 /// # assert_eq!(stream.next().await.unwrap(), w);
1475 /// # }
1476 /// # }));
1477 /// # }
1478 /// ```
1479 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1480 Optional::new(
1481 self.location.clone(),
1482 HydroNode::DeferTick {
1483 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1484 metadata: self.location.new_node_metadata(Self::collection_kind()),
1485 },
1486 )
1487 }
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492 #[cfg(feature = "deploy")]
1493 use futures::StreamExt;
1494 #[cfg(feature = "deploy")]
1495 use hydro_deploy::Deployment;
1496 #[cfg(any(feature = "deploy", feature = "sim"))]
1497 use stageleft::q;
1498
1499 #[cfg(feature = "deploy")]
1500 use super::Optional;
1501 #[cfg(any(feature = "deploy", feature = "sim"))]
1502 use crate::compile::builder::FlowBuilder;
1503 #[cfg(any(feature = "deploy", feature = "sim"))]
1504 use crate::location::Location;
1505 #[cfg(feature = "deploy")]
1506 use crate::nondet::nondet;
1507
1508 #[cfg(feature = "deploy")]
1509 #[tokio::test]
1510 async fn optional_or_cardinality() {
1511 let mut deployment = Deployment::new();
1512
1513 let mut flow = FlowBuilder::new();
1514 let node = flow.process::<()>();
1515 let external = flow.external::<()>();
1516
1517 let node_tick = node.tick();
1518 let tick_singleton = node_tick.singleton(q!(123));
1519 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1520 let counts = tick_optional_inhabited
1521 .clone()
1522 .or(tick_optional_inhabited)
1523 .into_stream()
1524 .count()
1525 .all_ticks()
1526 .send_bincode_external(&external);
1527
1528 let nodes = flow
1529 .with_process(&node, deployment.Localhost())
1530 .with_external(&external, deployment.Localhost())
1531 .deploy(&mut deployment);
1532
1533 deployment.deploy().await.unwrap();
1534
1535 let mut external_out = nodes.connect(counts).await;
1536
1537 deployment.start().await.unwrap();
1538
1539 assert_eq!(external_out.next().await.unwrap(), 1);
1540 }
1541
1542 #[cfg(feature = "deploy")]
1543 #[tokio::test]
1544 async fn into_singleton_top_level_none_cardinality() {
1545 let mut deployment = Deployment::new();
1546
1547 let mut flow = FlowBuilder::new();
1548 let node = flow.process::<()>();
1549 let external = flow.external::<()>();
1550
1551 let node_tick = node.tick();
1552 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1553 let into_singleton = top_level_none.into_singleton();
1554
1555 let tick_driver = node.spin();
1556
1557 let counts = into_singleton
1558 .snapshot(&node_tick, nondet!(/** test */))
1559 .into_stream()
1560 .count()
1561 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1562 .map(q!(|(c, _)| c))
1563 .all_ticks()
1564 .send_bincode_external(&external);
1565
1566 let nodes = flow
1567 .with_process(&node, deployment.Localhost())
1568 .with_external(&external, deployment.Localhost())
1569 .deploy(&mut deployment);
1570
1571 deployment.deploy().await.unwrap();
1572
1573 let mut external_out = nodes.connect(counts).await;
1574
1575 deployment.start().await.unwrap();
1576
1577 assert_eq!(external_out.next().await.unwrap(), 1);
1578 assert_eq!(external_out.next().await.unwrap(), 1);
1579 assert_eq!(external_out.next().await.unwrap(), 1);
1580 }
1581
1582 #[cfg(feature = "deploy")]
1583 #[tokio::test]
1584 async fn into_singleton_unbounded_top_level_none_cardinality() {
1585 let mut deployment = Deployment::new();
1586
1587 let mut flow = FlowBuilder::new();
1588 let node = flow.process::<()>();
1589 let external = flow.external::<()>();
1590
1591 let node_tick = node.tick();
1592 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1593 let into_singleton = top_level_none.into_singleton();
1594
1595 let tick_driver = node.spin();
1596
1597 let counts = into_singleton
1598 .snapshot(&node_tick, nondet!(/** test */))
1599 .into_stream()
1600 .count()
1601 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1602 .map(q!(|(c, _)| c))
1603 .all_ticks()
1604 .send_bincode_external(&external);
1605
1606 let nodes = flow
1607 .with_process(&node, deployment.Localhost())
1608 .with_external(&external, deployment.Localhost())
1609 .deploy(&mut deployment);
1610
1611 deployment.deploy().await.unwrap();
1612
1613 let mut external_out = nodes.connect(counts).await;
1614
1615 deployment.start().await.unwrap();
1616
1617 assert_eq!(external_out.next().await.unwrap(), 1);
1618 assert_eq!(external_out.next().await.unwrap(), 1);
1619 assert_eq!(external_out.next().await.unwrap(), 1);
1620 }
1621
1622 #[cfg(feature = "sim")]
1623 #[test]
1624 fn top_level_optional_some_into_stream_no_replay() {
1625 let mut flow = FlowBuilder::new();
1626 let node = flow.process::<()>();
1627
1628 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1629 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1630 let filtered_some = folded.filter(q!(|_| true));
1631
1632 let out_recv = filtered_some.into_stream().sim_output();
1633
1634 flow.sim().exhaustive(async || {
1635 out_recv.assert_yields_only([10]).await;
1636 });
1637 }
1638
1639 #[cfg(feature = "sim")]
1640 #[test]
1641 fn top_level_optional_none_into_stream_no_replay() {
1642 let mut flow = FlowBuilder::new();
1643 let node = flow.process::<()>();
1644
1645 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1646 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1647 let filtered_none = folded.filter(q!(|_| false));
1648
1649 let out_recv = filtered_none.into_stream().sim_output();
1650
1651 flow.sim().exhaustive(async || {
1652 out_recv.assert_yields_only([] as [i32; 0]).await;
1653 });
1654 }
1655}