Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19#[cfg(stageleft_runtime)]
20use crate::location::dynamic::DynLocation;
21use crate::location::external_process::ExternalBincodeStream;
22use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
23use crate::networking::{NetworkFor, TCP};
24use crate::nondet::NonDet;
25#[cfg(feature = "sim")]
26use crate::sim::SimReceiver;
27use crate::staging_util::get_this_crate;
28
29// same as the one in `hydro_std`, but internal use only
30fn track_membership<'a, C, L: Location<'a> + NoTick>(
31    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
32) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
33    membership.fold(
34        q!(|| false),
35        q!(|present, event| {
36            match event {
37                MembershipEvent::Joined => *present = true,
38                MembershipEvent::Left => *present = false,
39            }
40        }),
41    )
42}
43
44fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
45    let root = get_this_crate();
46
47    if is_demux {
48        parse_quote! {
49            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
50                |(id, data)| {
51                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
52                }
53            )
54        }
55    } else {
56        parse_quote! {
57            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
58                |data| {
59                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
60                }
61            )
62        }
63    }
64}
65
66pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
67    serialize_bincode_with_type(is_demux, &quote_type::<T>())
68}
69
70fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
71    let root = get_this_crate();
72    if let Some(c_type) = tagged {
73        parse_quote! {
74            |res| {
75                let (id, b) = res.unwrap();
76                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
77            }
78        }
79    } else {
80        parse_quote! {
81            |res| {
82                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
83            }
84        }
85    }
86}
87
88pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
89    deserialize_bincode_with_type(tagged, &quote_type::<T>())
90}
91
92impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
93    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
94    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
95    /// using [`bincode`] to serialize/deserialize messages.
96    ///
97    /// The returned stream captures the elements received at the destination, where values will
98    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
99    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
100    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
101    /// dropped no further messages will be sent.
102    ///
103    /// # Example
104    /// ```rust
105    /// # #[cfg(feature = "deploy")] {
106    /// # use hydro_lang::prelude::*;
107    /// # use futures::StreamExt;
108    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
109    /// let p1 = flow.process::<()>();
110    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
111    /// let p2 = flow.process::<()>();
112    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
113    /// // 1, 2, 3
114    /// # on_p2.send_bincode(&p_out)
115    /// # }, |mut stream| async move {
116    /// # for w in 1..=3 {
117    /// #     assert_eq!(stream.next().await, Some(w));
118    /// # }
119    /// # }));
120    /// # }
121    /// ```
122    pub fn send_bincode<L2>(
123        self,
124        other: &Process<'a, L2>,
125    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
126    where
127        T: Serialize + DeserializeOwned,
128    {
129        self.send(other, TCP.fail_stop().bincode())
130    }
131
132    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
133    /// using the configuration in `via` to set up the message transport.
134    ///
135    /// The returned stream captures the elements received at the destination, where values will
136    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
137    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
138    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
139    /// dropped no further messages will be sent.
140    ///
141    /// # Example
142    /// ```rust
143    /// # #[cfg(feature = "deploy")] {
144    /// # use hydro_lang::prelude::*;
145    /// # use futures::StreamExt;
146    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
147    /// let p1 = flow.process::<()>();
148    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
149    /// let p2 = flow.process::<()>();
150    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
151    /// // 1, 2, 3
152    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
153    /// # }, |mut stream| async move {
154    /// # for w in 1..=3 {
155    /// #     assert_eq!(stream.next().await, Some(w));
156    /// # }
157    /// # }));
158    /// # }
159    /// ```
160    pub fn send<L2, N: NetworkFor<T>>(
161        self,
162        to: &Process<'a, L2>,
163        via: N,
164    ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
165    where
166        T: Serialize + DeserializeOwned,
167        O: MinOrder<N::OrderingGuarantee>,
168    {
169        let serialize_pipeline = Some(N::serialize_thunk(false));
170        let deserialize_pipeline = Some(N::deserialize_thunk(None));
171
172        let name = via.name();
173        if to.multiversioned() && name.is_none() {
174            panic!(
175                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
176            );
177        }
178
179        Stream::new(
180            to.clone(),
181            HydroNode::Network {
182                name: name.map(ToOwned::to_owned),
183                networking_info: N::networking_info(),
184                serialize_fn: serialize_pipeline.map(|e| e.into()),
185                instantiate_fn: DebugInstantiate::Building,
186                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
187                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
188                metadata: to.new_node_metadata(Stream::<
189                    T,
190                    Process<'a, L2>,
191                    Unbounded,
192                    <O as MinOrder<N::OrderingGuarantee>>::Min,
193                    R,
194                >::collection_kind()),
195            },
196        )
197    }
198
199    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
200    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
201    /// using [`bincode`] to serialize/deserialize messages.
202    ///
203    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
204    /// membership information. This is a common pattern in distributed systems for broadcasting data to
205    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
206    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
207    /// each element to all cluster members.
208    ///
209    /// # Non-Determinism
210    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
211    /// to the current cluster members _at that point in time_. Depending on when we are notified of
212    /// membership changes, we will broadcast each element to different members.
213    ///
214    /// # Example
215    /// ```rust
216    /// # #[cfg(feature = "deploy")] {
217    /// # use hydro_lang::prelude::*;
218    /// # use futures::StreamExt;
219    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
220    /// let p1 = flow.process::<()>();
221    /// let workers: Cluster<()> = flow.cluster::<()>();
222    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
223    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
224    /// # on_worker.send_bincode(&p2).entries()
225    /// // if there are 4 members in the cluster, each receives one element
226    /// // - MemberId::<()>(0): [123]
227    /// // - MemberId::<()>(1): [123]
228    /// // - MemberId::<()>(2): [123]
229    /// // - MemberId::<()>(3): [123]
230    /// # }, |mut stream| async move {
231    /// # let mut results = Vec::new();
232    /// # for w in 0..4 {
233    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
234    /// # }
235    /// # results.sort();
236    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
237    /// # }));
238    /// # }
239    /// ```
240    pub fn broadcast_bincode<L2: 'a>(
241        self,
242        other: &Cluster<'a, L2>,
243        nondet_membership: NonDet,
244    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
245    where
246        T: Clone + Serialize + DeserializeOwned,
247    {
248        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
249    }
250
251    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
252    /// using the configuration in `via` to set up the message transport.
253    ///
254    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
255    /// membership information. This is a common pattern in distributed systems for broadcasting data to
256    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
257    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
258    /// each element to all cluster members.
259    ///
260    /// # Non-Determinism
261    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
262    /// to the current cluster members _at that point in time_. Depending on when we are notified of
263    /// membership changes, we will broadcast each element to different members.
264    ///
265    /// # Example
266    /// ```rust
267    /// # #[cfg(feature = "deploy")] {
268    /// # use hydro_lang::prelude::*;
269    /// # use futures::StreamExt;
270    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
271    /// let p1 = flow.process::<()>();
272    /// let workers: Cluster<()> = flow.cluster::<()>();
273    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
274    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
275    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
276    /// // if there are 4 members in the cluster, each receives one element
277    /// // - MemberId::<()>(0): [123]
278    /// // - MemberId::<()>(1): [123]
279    /// // - MemberId::<()>(2): [123]
280    /// // - MemberId::<()>(3): [123]
281    /// # }, |mut stream| async move {
282    /// # let mut results = Vec::new();
283    /// # for w in 0..4 {
284    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
285    /// # }
286    /// # results.sort();
287    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
288    /// # }));
289    /// # }
290    /// ```
291    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
292        self,
293        to: &Cluster<'a, L2>,
294        via: N,
295        nondet_membership: NonDet,
296    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
297    where
298        T: Clone + Serialize + DeserializeOwned,
299        O: MinOrder<N::OrderingGuarantee>,
300    {
301        let ids = track_membership(self.location.source_cluster_members(to));
302        sliced! {
303            let members_snapshot = use(ids, nondet_membership);
304            let elements = use(self, nondet_membership);
305
306            let current_members = members_snapshot.filter(q!(|b| *b));
307            elements.repeat_with_keys(current_members)
308        }
309        .demux(to, via)
310    }
311
312    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
313    /// serialization. The external process can receive these elements by establishing a TCP
314    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
315    ///
316    /// # Example
317    /// ```rust
318    /// # #[cfg(feature = "deploy")] {
319    /// # use hydro_lang::prelude::*;
320    /// # use futures::StreamExt;
321    /// # tokio_test::block_on(async move {
322    /// let mut flow = FlowBuilder::new();
323    /// let process = flow.process::<()>();
324    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
325    /// let external = flow.external::<()>();
326    /// let external_handle = numbers.send_bincode_external(&external);
327    ///
328    /// let mut deployment = hydro_deploy::Deployment::new();
329    /// let nodes = flow
330    ///     .with_process(&process, deployment.Localhost())
331    ///     .with_external(&external, deployment.Localhost())
332    ///     .deploy(&mut deployment);
333    ///
334    /// deployment.deploy().await.unwrap();
335    /// // establish the TCP connection
336    /// let mut external_recv_stream = nodes.connect(external_handle).await;
337    /// deployment.start().await.unwrap();
338    ///
339    /// for w in 1..=3 {
340    ///     assert_eq!(external_recv_stream.next().await, Some(w));
341    /// }
342    /// # });
343    /// # }
344    /// ```
345    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
346    where
347        T: Serialize + DeserializeOwned,
348    {
349        let serialize_pipeline = Some(serialize_bincode::<T>(false));
350
351        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
352
353        let external_port_id = flow_state_borrow.next_external_port();
354
355        flow_state_borrow.push_root(HydroRoot::SendExternal {
356            to_external_key: other.key,
357            to_port_id: external_port_id,
358            to_many: false,
359            unpaired: true,
360            serialize_fn: serialize_pipeline.map(|e| e.into()),
361            instantiate_fn: DebugInstantiate::Building,
362            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
363            op_metadata: HydroIrOpMetadata::new(),
364        });
365
366        ExternalBincodeStream {
367            process_key: other.key,
368            port_id: external_port_id,
369            _phantom: PhantomData,
370        }
371    }
372
373    #[cfg(feature = "sim")]
374    /// Sets up a simulation output port for this stream, allowing test code to receive elements
375    /// sent to this stream during simulation.
376    pub fn sim_output(self) -> SimReceiver<T, O, R>
377    where
378        T: Serialize + DeserializeOwned,
379    {
380        let external_location: External<'a, ()> = External {
381            key: LocationKey::FIRST,
382            flow_state: self.location.flow_state().clone(),
383            _phantom: PhantomData,
384        };
385
386        let external = self.send_bincode_external(&external_location);
387
388        SimReceiver(external.port_id, PhantomData)
389    }
390}
391
392impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
393    /// Creates an external output for embedded deployment mode.
394    ///
395    /// The `name` parameter specifies the name of the field in the generated
396    /// `EmbeddedOutputs` struct that will receive elements from this stream.
397    /// The generated function will accept an `EmbeddedOutputs` struct with an
398    /// `impl FnMut(T)` field with this name.
399    pub fn embedded_output(self, name: impl Into<String>) {
400        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
401
402        self.location
403            .flow_state()
404            .borrow_mut()
405            .push_root(HydroRoot::EmbeddedOutput {
406                ident,
407                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
408                op_metadata: HydroIrOpMetadata::new(),
409            });
410    }
411}
412
413impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
414    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
415{
416    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
417    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
418    /// using [`bincode`] to serialize/deserialize messages.
419    ///
420    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
421    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
422    /// this API allows precise targeting of specific cluster members rather than broadcasting to
423    /// all members.
424    ///
425    /// # Example
426    /// ```rust
427    /// # #[cfg(feature = "deploy")] {
428    /// # use hydro_lang::prelude::*;
429    /// # use futures::StreamExt;
430    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
431    /// let p1 = flow.process::<()>();
432    /// let workers: Cluster<()> = flow.cluster::<()>();
433    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
434    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
435    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
436    ///     .demux_bincode(&workers);
437    /// # on_worker.send_bincode(&p2).entries()
438    /// // if there are 4 members in the cluster, each receives one element
439    /// // - MemberId::<()>(0): [0]
440    /// // - MemberId::<()>(1): [1]
441    /// // - MemberId::<()>(2): [2]
442    /// // - MemberId::<()>(3): [3]
443    /// # }, |mut stream| async move {
444    /// # let mut results = Vec::new();
445    /// # for w in 0..4 {
446    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
447    /// # }
448    /// # results.sort();
449    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
450    /// # }));
451    /// # }
452    /// ```
453    pub fn demux_bincode(
454        self,
455        other: &Cluster<'a, L2>,
456    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
457    where
458        T: Serialize + DeserializeOwned,
459    {
460        self.demux(other, TCP.fail_stop().bincode())
461    }
462
463    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
464    /// using the configuration in `via` to set up the message transport.
465    ///
466    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
467    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
468    /// this API allows precise targeting of specific cluster members rather than broadcasting to
469    /// all members.
470    ///
471    /// # Example
472    /// ```rust
473    /// # #[cfg(feature = "deploy")] {
474    /// # use hydro_lang::prelude::*;
475    /// # use futures::StreamExt;
476    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
477    /// let p1 = flow.process::<()>();
478    /// let workers: Cluster<()> = flow.cluster::<()>();
479    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
480    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
481    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
482    ///     .demux(&workers, TCP.fail_stop().bincode());
483    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
484    /// // if there are 4 members in the cluster, each receives one element
485    /// // - MemberId::<()>(0): [0]
486    /// // - MemberId::<()>(1): [1]
487    /// // - MemberId::<()>(2): [2]
488    /// // - MemberId::<()>(3): [3]
489    /// # }, |mut stream| async move {
490    /// # let mut results = Vec::new();
491    /// # for w in 0..4 {
492    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
493    /// # }
494    /// # results.sort();
495    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
496    /// # }));
497    /// # }
498    /// ```
499    pub fn demux<N: NetworkFor<T>>(
500        self,
501        to: &Cluster<'a, L2>,
502        via: N,
503    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
504    where
505        T: Serialize + DeserializeOwned,
506        O: MinOrder<N::OrderingGuarantee>,
507    {
508        self.into_keyed().demux(to, via)
509    }
510}
511
512impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
513    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
514    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
515    /// [`bincode`] to serialize/deserialize messages.
516    ///
517    /// This provides load balancing by evenly distributing work across cluster members. The
518    /// distribution is deterministic based on element order - the first element goes to member 0,
519    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
520    ///
521    /// # Non-Determinism
522    /// The set of cluster members may asynchronously change over time. Each element is distributed
523    /// based on the current cluster membership _at that point in time_. Depending on when cluster
524    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
525    /// membership is stable, the order of members in the round-robin pattern may change across runs.
526    ///
527    /// # Ordering Requirements
528    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
529    /// order of messages and retries affects the round-robin pattern.
530    ///
531    /// # Example
532    /// ```rust
533    /// # #[cfg(feature = "deploy")] {
534    /// # use hydro_lang::prelude::*;
535    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
536    /// # use futures::StreamExt;
537    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
538    /// let p1 = flow.process::<()>();
539    /// let workers: Cluster<()> = flow.cluster::<()>();
540    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
541    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
542    /// on_worker.send_bincode(&p2)
543    /// # .first().values() // we use first to assert that each member gets one element
544    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
545    /// // - MemberId::<()>(?): [1]
546    /// // - MemberId::<()>(?): [2]
547    /// // - MemberId::<()>(?): [3]
548    /// // - MemberId::<()>(?): [4]
549    /// # }, |mut stream| async move {
550    /// # let mut results = Vec::new();
551    /// # for w in 0..4 {
552    /// #     results.push(stream.next().await.unwrap());
553    /// # }
554    /// # results.sort();
555    /// # assert_eq!(results, vec![1, 2, 3, 4]);
556    /// # }));
557    /// # }
558    /// ```
559    pub fn round_robin_bincode<L2: 'a>(
560        self,
561        other: &Cluster<'a, L2>,
562        nondet_membership: NonDet,
563    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
564    where
565        T: Serialize + DeserializeOwned,
566    {
567        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
568    }
569
570    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
571    /// the configuration in `via` to set up the message transport.
572    ///
573    /// This provides load balancing by evenly distributing work across cluster members. The
574    /// distribution is deterministic based on element order - the first element goes to member 0,
575    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
576    ///
577    /// # Non-Determinism
578    /// The set of cluster members may asynchronously change over time. Each element is distributed
579    /// based on the current cluster membership _at that point in time_. Depending on when cluster
580    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
581    /// membership is stable, the order of members in the round-robin pattern may change across runs.
582    ///
583    /// # Ordering Requirements
584    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
585    /// order of messages and retries affects the round-robin pattern.
586    ///
587    /// # Example
588    /// ```rust
589    /// # #[cfg(feature = "deploy")] {
590    /// # use hydro_lang::prelude::*;
591    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
592    /// # use futures::StreamExt;
593    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
594    /// let p1 = flow.process::<()>();
595    /// let workers: Cluster<()> = flow.cluster::<()>();
596    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
597    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
598    /// on_worker.send(&p2, TCP.fail_stop().bincode())
599    /// # .first().values() // we use first to assert that each member gets one element
600    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
601    /// // - MemberId::<()>(?): [1]
602    /// // - MemberId::<()>(?): [2]
603    /// // - MemberId::<()>(?): [3]
604    /// // - MemberId::<()>(?): [4]
605    /// # }, |mut stream| async move {
606    /// # let mut results = Vec::new();
607    /// # for w in 0..4 {
608    /// #     results.push(stream.next().await.unwrap());
609    /// # }
610    /// # results.sort();
611    /// # assert_eq!(results, vec![1, 2, 3, 4]);
612    /// # }));
613    /// # }
614    /// ```
615    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
616        self,
617        to: &Cluster<'a, L2>,
618        via: N,
619        nondet_membership: NonDet,
620    ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
621    where
622        T: Serialize + DeserializeOwned,
623    {
624        let ids = track_membership(self.location.source_cluster_members(to));
625        sliced! {
626            let members_snapshot = use(ids, nondet_membership);
627            let elements = use(self.enumerate(), nondet_membership);
628
629            let current_members = members_snapshot
630                .filter(q!(|b| *b))
631                .keys()
632                .assume_ordering::<TotalOrder>(nondet_membership)
633                .collect_vec();
634
635            elements
636                .cross_singleton(current_members)
637                .map(q!(|(data, members)| (
638                    members[data.0 % members.len()].clone(),
639                    data.1
640                )))
641        }
642        .demux(to, via)
643    }
644}
645
646impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
647    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
648    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
649    /// [`bincode`] to serialize/deserialize messages.
650    ///
651    /// This provides load balancing by evenly distributing work across cluster members. The
652    /// distribution is deterministic based on element order - the first element goes to member 0,
653    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
654    ///
655    /// # Non-Determinism
656    /// The set of cluster members may asynchronously change over time. Each element is distributed
657    /// based on the current cluster membership _at that point in time_. Depending on when cluster
658    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
659    /// membership is stable, the order of members in the round-robin pattern may change across runs.
660    ///
661    /// # Ordering Requirements
662    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
663    /// order of messages and retries affects the round-robin pattern.
664    ///
665    /// # Example
666    /// ```rust
667    /// # #[cfg(feature = "deploy")] {
668    /// # use hydro_lang::prelude::*;
669    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
670    /// # use hydro_lang::location::MemberId;
671    /// # use futures::StreamExt;
672    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
673    /// let p1 = flow.process::<()>();
674    /// let workers1: Cluster<()> = flow.cluster::<()>();
675    /// let workers2: Cluster<()> = flow.cluster::<()>();
676    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
677    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
678    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
679    /// on_worker2.send_bincode(&p2)
680    /// # .entries()
681    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
682    /// # }, |mut stream| async move {
683    /// # let mut results = Vec::new();
684    /// # let mut locations = std::collections::HashSet::new();
685    /// # for w in 0..=16 {
686    /// #     let (location, v) = stream.next().await.unwrap();
687    /// #     locations.insert(location);
688    /// #     results.push(v);
689    /// # }
690    /// # results.sort();
691    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
692    /// # assert_eq!(locations.len(), 16);
693    /// # }));
694    /// # }
695    /// ```
696    pub fn round_robin_bincode<L2: 'a>(
697        self,
698        other: &Cluster<'a, L2>,
699        nondet_membership: NonDet,
700    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
701    where
702        T: Serialize + DeserializeOwned,
703    {
704        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
705    }
706
707    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
708    /// the configuration in `via` to set up the message transport.
709    ///
710    /// This provides load balancing by evenly distributing work across cluster members. The
711    /// distribution is deterministic based on element order - the first element goes to member 0,
712    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
713    ///
714    /// # Non-Determinism
715    /// The set of cluster members may asynchronously change over time. Each element is distributed
716    /// based on the current cluster membership _at that point in time_. Depending on when cluster
717    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
718    /// membership is stable, the order of members in the round-robin pattern may change across runs.
719    ///
720    /// # Ordering Requirements
721    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
722    /// order of messages and retries affects the round-robin pattern.
723    ///
724    /// # Example
725    /// ```rust
726    /// # #[cfg(feature = "deploy")] {
727    /// # use hydro_lang::prelude::*;
728    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
729    /// # use hydro_lang::location::MemberId;
730    /// # use futures::StreamExt;
731    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
732    /// let p1 = flow.process::<()>();
733    /// let workers1: Cluster<()> = flow.cluster::<()>();
734    /// let workers2: Cluster<()> = flow.cluster::<()>();
735    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
736    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
737    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
738    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
739    /// # .entries()
740    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
741    /// # }, |mut stream| async move {
742    /// # let mut results = Vec::new();
743    /// # let mut locations = std::collections::HashSet::new();
744    /// # for w in 0..=16 {
745    /// #     let (location, v) = stream.next().await.unwrap();
746    /// #     locations.insert(location);
747    /// #     results.push(v);
748    /// # }
749    /// # results.sort();
750    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
751    /// # assert_eq!(locations.len(), 16);
752    /// # }));
753    /// # }
754    /// ```
755    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
756        self,
757        to: &Cluster<'a, L2>,
758        via: N,
759        nondet_membership: NonDet,
760    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
761    where
762        T: Serialize + DeserializeOwned,
763    {
764        let ids = track_membership(self.location.source_cluster_members(to));
765        sliced! {
766            let members_snapshot = use(ids, nondet_membership);
767            let elements = use(self.enumerate(), nondet_membership);
768
769            let current_members = members_snapshot
770                .filter(q!(|b| *b))
771                .keys()
772                .assume_ordering::<TotalOrder>(nondet_membership)
773                .collect_vec();
774
775            elements
776                .cross_singleton(current_members)
777                .map(q!(|(data, members)| (
778                    members[data.0 % members.len()].clone(),
779                    data.1
780                )))
781        }
782        .demux(to, via)
783    }
784}
785
786impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
787    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
788    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
789    /// using [`bincode`] to serialize/deserialize messages.
790    ///
791    /// Each cluster member sends its local stream elements, and they are collected at the destination
792    /// as a [`KeyedStream`] where keys identify the source cluster member.
793    ///
794    /// # Example
795    /// ```rust
796    /// # #[cfg(feature = "deploy")] {
797    /// # use hydro_lang::prelude::*;
798    /// # use futures::StreamExt;
799    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
800    /// let workers: Cluster<()> = flow.cluster::<()>();
801    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
802    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
803    /// # all_received.entries()
804    /// # }, |mut stream| async move {
805    /// // if there are 4 members in the cluster, we should receive 4 elements
806    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
807    /// # let mut results = Vec::new();
808    /// # for w in 0..4 {
809    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
810    /// # }
811    /// # results.sort();
812    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
813    /// # }));
814    /// # }
815    /// ```
816    ///
817    /// If you don't need to know the source for each element, you can use `.values()`
818    /// to get just the data:
819    /// ```rust
820    /// # #[cfg(feature = "deploy")] {
821    /// # use hydro_lang::prelude::*;
822    /// # use hydro_lang::live_collections::stream::NoOrder;
823    /// # use futures::StreamExt;
824    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
825    /// # let workers: Cluster<()> = flow.cluster::<()>();
826    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
827    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
828    /// # values
829    /// # }, |mut stream| async move {
830    /// # let mut results = Vec::new();
831    /// # for w in 0..4 {
832    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
833    /// # }
834    /// # results.sort();
835    /// // if there are 4 members in the cluster, we should receive 4 elements
836    /// // 1, 1, 1, 1
837    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
838    /// # }));
839    /// # }
840    /// ```
841    pub fn send_bincode<L2>(
842        self,
843        other: &Process<'a, L2>,
844    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
845    where
846        T: Serialize + DeserializeOwned,
847    {
848        self.send(other, TCP.fail_stop().bincode())
849    }
850
851    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
852    /// using the configuration in `via` to set up the message transport.
853    ///
854    /// Each cluster member sends its local stream elements, and they are collected at the destination
855    /// as a [`KeyedStream`] where keys identify the source cluster member.
856    ///
857    /// # Example
858    /// ```rust
859    /// # #[cfg(feature = "deploy")] {
860    /// # use hydro_lang::prelude::*;
861    /// # use futures::StreamExt;
862    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
863    /// let workers: Cluster<()> = flow.cluster::<()>();
864    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
865    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
866    /// # all_received.entries()
867    /// # }, |mut stream| async move {
868    /// // if there are 4 members in the cluster, we should receive 4 elements
869    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
870    /// # let mut results = Vec::new();
871    /// # for w in 0..4 {
872    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
873    /// # }
874    /// # results.sort();
875    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
876    /// # }));
877    /// # }
878    /// ```
879    ///
880    /// If you don't need to know the source for each element, you can use `.values()`
881    /// to get just the data:
882    /// ```rust
883    /// # #[cfg(feature = "deploy")] {
884    /// # use hydro_lang::prelude::*;
885    /// # use hydro_lang::live_collections::stream::NoOrder;
886    /// # use futures::StreamExt;
887    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
888    /// # let workers: Cluster<()> = flow.cluster::<()>();
889    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
890    /// let values: Stream<i32, _, _, NoOrder> =
891    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
892    /// # values
893    /// # }, |mut stream| async move {
894    /// # let mut results = Vec::new();
895    /// # for w in 0..4 {
896    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
897    /// # }
898    /// # results.sort();
899    /// // if there are 4 members in the cluster, we should receive 4 elements
900    /// // 1, 1, 1, 1
901    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
902    /// # }));
903    /// # }
904    /// ```
905    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
906    pub fn send<L2, N: NetworkFor<T>>(
907        self,
908        to: &Process<'a, L2>,
909        via: N,
910    ) -> KeyedStream<
911        MemberId<L>,
912        T,
913        Process<'a, L2>,
914        Unbounded,
915        <O as MinOrder<N::OrderingGuarantee>>::Min,
916        R,
917    >
918    where
919        T: Serialize + DeserializeOwned,
920        O: MinOrder<N::OrderingGuarantee>,
921    {
922        let serialize_pipeline = Some(N::serialize_thunk(false));
923
924        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
925
926        let name = via.name();
927        if to.multiversioned() && name.is_none() {
928            panic!(
929                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
930            );
931        }
932
933        let raw_stream: Stream<
934            (MemberId<L>, T),
935            Process<'a, L2>,
936            Unbounded,
937            <O as MinOrder<N::OrderingGuarantee>>::Min,
938            R,
939        > = Stream::new(
940            to.clone(),
941            HydroNode::Network {
942                name: name.map(ToOwned::to_owned),
943                networking_info: N::networking_info(),
944                serialize_fn: serialize_pipeline.map(|e| e.into()),
945                instantiate_fn: DebugInstantiate::Building,
946                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
947                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
948                metadata: to.new_node_metadata(Stream::<
949                    (MemberId<L>, T),
950                    Process<'a, L2>,
951                    Unbounded,
952                    <O as MinOrder<N::OrderingGuarantee>>::Min,
953                    R,
954                >::collection_kind()),
955            },
956        );
957
958        raw_stream.into_keyed()
959    }
960
961    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
962    /// Broadcasts elements of this stream at each source member to all members of a destination
963    /// cluster, using [`bincode`] to serialize/deserialize messages.
964    ///
965    /// Each source member sends each of its stream elements to **every** member of the cluster
966    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
967    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
968    /// **only data elements** and sends each element to all cluster members.
969    ///
970    /// # Non-Determinism
971    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
972    /// to the current cluster members known _at that point in time_ at the source member. Depending
973    /// on when each source member is notified of membership changes, it will broadcast each element
974    /// to different members.
975    ///
976    /// # Example
977    /// ```rust
978    /// # #[cfg(feature = "deploy")] {
979    /// # use hydro_lang::prelude::*;
980    /// # use hydro_lang::location::MemberId;
981    /// # use futures::StreamExt;
982    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
983    /// # type Source = ();
984    /// # type Destination = ();
985    /// let source: Cluster<Source> = flow.cluster::<Source>();
986    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
987    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
988    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
989    /// # on_destination.entries().send_bincode(&p2).entries()
990    /// // if there are 4 members in the desination, each receives one element from each source member
991    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
992    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
993    /// // - ...
994    /// # }, |mut stream| async move {
995    /// # let mut results = Vec::new();
996    /// # for w in 0..16 {
997    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
998    /// # }
999    /// # results.sort();
1000    /// # assert_eq!(results, vec![
1001    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1002    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1003    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1004    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1005    /// # ]);
1006    /// # }));
1007    /// # }
1008    /// ```
1009    pub fn broadcast_bincode<L2: 'a>(
1010        self,
1011        other: &Cluster<'a, L2>,
1012        nondet_membership: NonDet,
1013    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1014    where
1015        T: Clone + Serialize + DeserializeOwned,
1016    {
1017        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1018    }
1019
1020    /// Broadcasts elements of this stream at each source member to all members of a destination
1021    /// cluster, using the configuration in `via` to set up the message transport.
1022    ///
1023    /// Each source member sends each of its stream elements to **every** member of the cluster
1024    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1025    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1026    /// **only data elements** and sends each element to all cluster members.
1027    ///
1028    /// # Non-Determinism
1029    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1030    /// to the current cluster members known _at that point in time_ at the source member. Depending
1031    /// on when each source member is notified of membership changes, it will broadcast each element
1032    /// to different members.
1033    ///
1034    /// # Example
1035    /// ```rust
1036    /// # #[cfg(feature = "deploy")] {
1037    /// # use hydro_lang::prelude::*;
1038    /// # use hydro_lang::location::MemberId;
1039    /// # use futures::StreamExt;
1040    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1041    /// # type Source = ();
1042    /// # type Destination = ();
1043    /// let source: Cluster<Source> = flow.cluster::<Source>();
1044    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1045    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1046    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1047    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1048    /// // if there are 4 members in the desination, each receives one element from each source member
1049    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1050    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1051    /// // - ...
1052    /// # }, |mut stream| async move {
1053    /// # let mut results = Vec::new();
1054    /// # for w in 0..16 {
1055    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1056    /// # }
1057    /// # results.sort();
1058    /// # assert_eq!(results, vec![
1059    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1060    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1061    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1062    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1063    /// # ]);
1064    /// # }));
1065    /// # }
1066    /// ```
1067    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1068    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1069        self,
1070        to: &Cluster<'a, L2>,
1071        via: N,
1072        nondet_membership: NonDet,
1073    ) -> KeyedStream<
1074        MemberId<L>,
1075        T,
1076        Cluster<'a, L2>,
1077        Unbounded,
1078        <O as MinOrder<N::OrderingGuarantee>>::Min,
1079        R,
1080    >
1081    where
1082        T: Clone + Serialize + DeserializeOwned,
1083        O: MinOrder<N::OrderingGuarantee>,
1084    {
1085        let ids = track_membership(self.location.source_cluster_members(to));
1086        sliced! {
1087            let members_snapshot = use(ids, nondet_membership);
1088            let elements = use(self, nondet_membership);
1089
1090            let current_members = members_snapshot.filter(q!(|b| *b));
1091            elements.repeat_with_keys(current_members)
1092        }
1093        .demux(to, via)
1094    }
1095
1096    #[cfg(feature = "sim")]
1097    /// Sends elements of this cluster stream to an external location using bincode serialization.
1098    fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1099    where
1100        T: Serialize + DeserializeOwned,
1101    {
1102        let serialize_pipeline = Some(serialize_bincode::<T>(false));
1103
1104        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1105
1106        let external_port_id = flow_state_borrow.next_external_port();
1107
1108        flow_state_borrow.push_root(HydroRoot::SendExternal {
1109            to_external_key: other.key,
1110            to_port_id: external_port_id,
1111            to_many: false,
1112            unpaired: true,
1113            serialize_fn: serialize_pipeline.map(|e| e.into()),
1114            instantiate_fn: DebugInstantiate::Building,
1115            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1116            op_metadata: HydroIrOpMetadata::new(),
1117        });
1118
1119        ExternalBincodeStream {
1120            process_key: other.key,
1121            port_id: external_port_id,
1122            _phantom: PhantomData,
1123        }
1124    }
1125
1126    #[cfg(feature = "sim")]
1127    /// Sets up a simulation output port for this cluster stream, allowing test code
1128    /// to receive `(member_id, T)` pairs during simulation.
1129    pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1130    where
1131        T: Serialize + DeserializeOwned,
1132    {
1133        let external_location: External<'a, ()> = External {
1134            key: LocationKey::FIRST,
1135            flow_state: self.location.flow_state().clone(),
1136            _phantom: PhantomData,
1137        };
1138
1139        let external = self.send_bincode_external(&external_location);
1140
1141        crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1142    }
1143}
1144
1145impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1146    Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1147{
1148    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1149    /// Sends elements of this stream at each source member to specific members of a destination
1150    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1151    ///
1152    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1153    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1154    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1155    /// all members.
1156    ///
1157    /// Each cluster member sends its local stream elements, and they are collected at each
1158    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1159    ///
1160    /// # Example
1161    /// ```rust
1162    /// # #[cfg(feature = "deploy")] {
1163    /// # use hydro_lang::prelude::*;
1164    /// # use futures::StreamExt;
1165    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1166    /// # type Source = ();
1167    /// # type Destination = ();
1168    /// let source: Cluster<Source> = flow.cluster::<Source>();
1169    /// let to_send: Stream<_, Cluster<_>, _> = source
1170    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1171    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1172    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1173    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1174    /// # all_received.entries().send_bincode(&p2).entries()
1175    /// # }, |mut stream| async move {
1176    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1177    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1178    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1179    /// // - ...
1180    /// # let mut results = Vec::new();
1181    /// # for w in 0..16 {
1182    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1183    /// # }
1184    /// # results.sort();
1185    /// # assert_eq!(results, vec![
1186    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1187    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1188    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1189    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1190    /// # ]);
1191    /// # }));
1192    /// # }
1193    /// ```
1194    pub fn demux_bincode(
1195        self,
1196        other: &Cluster<'a, L2>,
1197    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1198    where
1199        T: Serialize + DeserializeOwned,
1200    {
1201        self.demux(other, TCP.fail_stop().bincode())
1202    }
1203
1204    /// Sends elements of this stream at each source member to specific members of a destination
1205    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1206    /// message transport.
1207    ///
1208    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1209    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1210    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1211    /// all members.
1212    ///
1213    /// Each cluster member sends its local stream elements, and they are collected at each
1214    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1215    ///
1216    /// # Example
1217    /// ```rust
1218    /// # #[cfg(feature = "deploy")] {
1219    /// # use hydro_lang::prelude::*;
1220    /// # use futures::StreamExt;
1221    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1222    /// # type Source = ();
1223    /// # type Destination = ();
1224    /// let source: Cluster<Source> = flow.cluster::<Source>();
1225    /// let to_send: Stream<_, Cluster<_>, _> = source
1226    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1227    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1228    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1229    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1230    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1231    /// # }, |mut stream| async move {
1232    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1233    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1234    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1235    /// // - ...
1236    /// # let mut results = Vec::new();
1237    /// # for w in 0..16 {
1238    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1239    /// # }
1240    /// # results.sort();
1241    /// # assert_eq!(results, vec![
1242    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1243    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1244    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1245    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1246    /// # ]);
1247    /// # }));
1248    /// # }
1249    /// ```
1250    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1251    pub fn demux<N: NetworkFor<T>>(
1252        self,
1253        to: &Cluster<'a, L2>,
1254        via: N,
1255    ) -> KeyedStream<
1256        MemberId<L>,
1257        T,
1258        Cluster<'a, L2>,
1259        Unbounded,
1260        <O as MinOrder<N::OrderingGuarantee>>::Min,
1261        R,
1262    >
1263    where
1264        T: Serialize + DeserializeOwned,
1265        O: MinOrder<N::OrderingGuarantee>,
1266    {
1267        self.into_keyed().demux(to, via)
1268    }
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273    #[cfg(feature = "sim")]
1274    use stageleft::q;
1275
1276    #[cfg(feature = "sim")]
1277    use crate::live_collections::sliced::sliced;
1278    #[cfg(feature = "sim")]
1279    use crate::location::{Location, MemberId};
1280    #[cfg(feature = "sim")]
1281    use crate::networking::TCP;
1282    #[cfg(feature = "sim")]
1283    use crate::nondet::nondet;
1284    #[cfg(feature = "sim")]
1285    use crate::prelude::FlowBuilder;
1286
1287    #[cfg(feature = "sim")]
1288    #[test]
1289    fn sim_send_bincode_o2o() {
1290        use crate::networking::TCP;
1291
1292        let mut flow = FlowBuilder::new();
1293        let node = flow.process::<()>();
1294        let node2 = flow.process::<()>();
1295
1296        let (in_send, input) = node.sim_input();
1297
1298        let out_recv = input
1299            .send(&node2, TCP.fail_stop().bincode())
1300            .batch(&node2.tick(), nondet!(/** test */))
1301            .count()
1302            .all_ticks()
1303            .sim_output();
1304
1305        let instances = flow.sim().exhaustive(async || {
1306            in_send.send(());
1307            in_send.send(());
1308            in_send.send(());
1309
1310            let received = out_recv.collect::<Vec<_>>().await;
1311            assert!(received.into_iter().sum::<usize>() == 3);
1312        });
1313
1314        assert_eq!(instances, 4); // 2^{3 - 1}
1315    }
1316
1317    #[cfg(feature = "sim")]
1318    #[test]
1319    fn sim_send_bincode_m2o() {
1320        let mut flow = FlowBuilder::new();
1321        let cluster = flow.cluster::<()>();
1322        let node = flow.process::<()>();
1323
1324        let input = cluster.source_iter(q!(vec![1]));
1325
1326        let out_recv = input
1327            .send(&node, TCP.fail_stop().bincode())
1328            .entries()
1329            .batch(&node.tick(), nondet!(/** test */))
1330            .all_ticks()
1331            .sim_output();
1332
1333        let instances = flow
1334            .sim()
1335            .with_cluster_size(&cluster, 4)
1336            .exhaustive(async || {
1337                out_recv
1338                    .assert_yields_only_unordered(vec![
1339                        (MemberId::from_raw_id(0), 1),
1340                        (MemberId::from_raw_id(1), 1),
1341                        (MemberId::from_raw_id(2), 1),
1342                        (MemberId::from_raw_id(3), 1),
1343                    ])
1344                    .await
1345            });
1346
1347        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1348    }
1349
1350    #[cfg(feature = "sim")]
1351    #[test]
1352    fn sim_send_bincode_multiple_m2o() {
1353        let mut flow = FlowBuilder::new();
1354        let cluster1 = flow.cluster::<()>();
1355        let cluster2 = flow.cluster::<()>();
1356        let node = flow.process::<()>();
1357
1358        let out_recv_1 = cluster1
1359            .source_iter(q!(vec![1]))
1360            .send(&node, TCP.fail_stop().bincode())
1361            .entries()
1362            .sim_output();
1363
1364        let out_recv_2 = cluster2
1365            .source_iter(q!(vec![2]))
1366            .send(&node, TCP.fail_stop().bincode())
1367            .entries()
1368            .sim_output();
1369
1370        let instances = flow
1371            .sim()
1372            .with_cluster_size(&cluster1, 3)
1373            .with_cluster_size(&cluster2, 4)
1374            .exhaustive(async || {
1375                out_recv_1
1376                    .assert_yields_only_unordered(vec![
1377                        (MemberId::from_raw_id(0), 1),
1378                        (MemberId::from_raw_id(1), 1),
1379                        (MemberId::from_raw_id(2), 1),
1380                    ])
1381                    .await;
1382
1383                out_recv_2
1384                    .assert_yields_only_unordered(vec![
1385                        (MemberId::from_raw_id(0), 2),
1386                        (MemberId::from_raw_id(1), 2),
1387                        (MemberId::from_raw_id(2), 2),
1388                        (MemberId::from_raw_id(3), 2),
1389                    ])
1390                    .await;
1391            });
1392
1393        assert_eq!(instances, 1);
1394    }
1395
1396    #[cfg(feature = "sim")]
1397    #[test]
1398    fn sim_send_bincode_o2m() {
1399        let mut flow = FlowBuilder::new();
1400        let cluster = flow.cluster::<()>();
1401        let node = flow.process::<()>();
1402
1403        let input = node.source_iter(q!(vec![
1404            (MemberId::from_raw_id(0), 123),
1405            (MemberId::from_raw_id(1), 456),
1406        ]));
1407
1408        let out_recv = input
1409            .demux(&cluster, TCP.fail_stop().bincode())
1410            .map(q!(|x| x + 1))
1411            .send(&node, TCP.fail_stop().bincode())
1412            .entries()
1413            .sim_output();
1414
1415        flow.sim()
1416            .with_cluster_size(&cluster, 4)
1417            .exhaustive(async || {
1418                out_recv
1419                    .assert_yields_only_unordered(vec![
1420                        (MemberId::from_raw_id(0), 124),
1421                        (MemberId::from_raw_id(1), 457),
1422                    ])
1423                    .await
1424            });
1425    }
1426
1427    #[cfg(feature = "sim")]
1428    #[test]
1429    fn sim_broadcast_bincode_o2m() {
1430        let mut flow = FlowBuilder::new();
1431        let cluster = flow.cluster::<()>();
1432        let node = flow.process::<()>();
1433
1434        let input = node.source_iter(q!(vec![123, 456]));
1435
1436        let out_recv = input
1437            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1438            .map(q!(|x| x + 1))
1439            .send(&node, TCP.fail_stop().bincode())
1440            .entries()
1441            .sim_output();
1442
1443        let mut c_1_produced = false;
1444        let mut c_2_produced = false;
1445
1446        flow.sim()
1447            .with_cluster_size(&cluster, 2)
1448            .exhaustive(async || {
1449                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1450
1451                // check that order is preserved
1452                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1453                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1454                    c_1_produced = true;
1455                }
1456
1457                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1458                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1459                    c_2_produced = true;
1460                }
1461            });
1462
1463        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1464    }
1465
1466    #[cfg(feature = "sim")]
1467    #[test]
1468    fn sim_send_bincode_m2m() {
1469        let mut flow = FlowBuilder::new();
1470        let cluster = flow.cluster::<()>();
1471        let node = flow.process::<()>();
1472
1473        let input = node.source_iter(q!(vec![
1474            (MemberId::from_raw_id(0), 123),
1475            (MemberId::from_raw_id(1), 456),
1476        ]));
1477
1478        let out_recv = input
1479            .demux(&cluster, TCP.fail_stop().bincode())
1480            .map(q!(|x| x + 1))
1481            .flat_map_ordered(q!(|x| vec![
1482                (MemberId::from_raw_id(0), x),
1483                (MemberId::from_raw_id(1), x),
1484            ]))
1485            .demux(&cluster, TCP.fail_stop().bincode())
1486            .entries()
1487            .send(&node, TCP.fail_stop().bincode())
1488            .entries()
1489            .sim_output();
1490
1491        flow.sim()
1492            .with_cluster_size(&cluster, 4)
1493            .exhaustive(async || {
1494                out_recv
1495                    .assert_yields_only_unordered(vec![
1496                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1497                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1498                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1499                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1500                    ])
1501                    .await
1502            });
1503    }
1504
1505    #[cfg(feature = "sim")]
1506    #[test]
1507    fn sim_lossy_delayed_forever_o2o() {
1508        use std::collections::HashSet;
1509
1510        use crate::properties::manual_proof;
1511
1512        let mut flow = FlowBuilder::new();
1513        let node = flow.process::<()>();
1514        let node2 = flow.process::<()>();
1515
1516        let received = node
1517            .source_iter(q!(0..3_u32))
1518            .send(&node2, TCP.lossy_delayed_forever().bincode())
1519            .fold(
1520                q!(|| std::collections::HashSet::<u32>::new()),
1521                q!(
1522                    |set, v| {
1523                        set.insert(v);
1524                    },
1525                    commutative = manual_proof!(/** set insert is commutative */)
1526                ),
1527            );
1528
1529        let out_recv = sliced! {
1530            let snapshot = use(received, nondet!(/** test */));
1531            snapshot.into_stream()
1532        }
1533        .sim_output();
1534
1535        let mut saw_non_contiguous = false;
1536
1537        flow.sim().test_safety_only().exhaustive(async || {
1538            let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1539
1540            // Check each individual snapshot for a non-contiguous subset.
1541            for set in &snapshots {
1542                #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1543                if set.len() >= 2 && set.len() < 3 {
1544                    let min = *set.iter().min().unwrap();
1545                    let max = *set.iter().max().unwrap();
1546                    if set.len() < (max - min + 1) as usize {
1547                        saw_non_contiguous = true;
1548                    }
1549                }
1550            }
1551        });
1552
1553        assert!(
1554            saw_non_contiguous,
1555            "Expected at least one execution with a non-contiguous subset of inputs"
1556        );
1557    }
1558}