Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::future::Future;
18use std::marker::PhantomData;
19use std::num::ParseIntError;
20use std::time::Duration;
21
22use bytes::{Bytes, BytesMut};
23use futures::stream::Stream as FuturesStream;
24use proc_macro2::Span;
25use quote::quote;
26use serde::de::DeserializeOwned;
27use serde::{Deserialize, Serialize};
28use slotmap::{Key, new_key_type};
29use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
30use stageleft::{QuotedWithContext, q, quote_type};
31use syn::parse_quote;
32use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
33
34use crate::compile::ir::{
35    ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
36};
37use crate::forward_handle::ForwardRef;
38#[cfg(stageleft_runtime)]
39use crate::forward_handle::{CycleCollection, ForwardHandle};
40use crate::live_collections::boundedness::{Bounded, Unbounded};
41use crate::live_collections::keyed_stream::KeyedStream;
42use crate::live_collections::singleton::Singleton;
43use crate::live_collections::stream::{
44    ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
45};
46use crate::location::dynamic::LocationId;
47use crate::location::external_process::{
48    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
49};
50use crate::nondet::NonDet;
51#[cfg(feature = "sim")]
52use crate::sim::SimSender;
53use crate::staging_util::get_this_crate;
54
55pub mod dynamic;
56
57pub mod external_process;
58pub use external_process::External;
59
60pub mod process;
61pub use process::Process;
62
63pub mod cluster;
64pub use cluster::Cluster;
65
66pub mod member_id;
67pub use member_id::{MemberId, TaglessMemberId};
68
69pub mod tick;
70pub use tick::{Atomic, NoTick, Tick};
71
72/// An event indicating a change in membership status of a location in a group
73/// (e.g. a node in a [`Cluster`] or an external client connection).
74#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
75pub enum MembershipEvent {
76    /// The member has joined the group and is now active.
77    Joined,
78    /// The member has left the group and is no longer active.
79    Left,
80}
81
82/// A hint for configuring the network transport used by an external connection.
83///
84/// This controls how the underlying TCP listener is set up when binding
85/// external client connections via methods like [`Location::bind_single_client`]
86/// or [`Location::bidi_external_many_bytes`].
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
88pub enum NetworkHint {
89    /// Automatically select the network configuration (e.g. an ephemeral port).
90    Auto,
91    /// Use a TCP port, optionally specifying a fixed port number.
92    ///
93    /// If `None`, an available port will be chosen automatically.
94    /// If `Some(port)`, the given port number will be used.
95    TcpPort(Option<u16>),
96}
97
98pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
99    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
100}
101
102#[stageleft::export(LocationKey)]
103new_key_type! {
104    /// A unique identifier for a clock tick.
105    pub struct LocationKey;
106}
107
108impl std::fmt::Display for LocationKey {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
111    }
112}
113
114/// This is used for the ECS membership stream.
115/// TODO(mingwei): Make this more robust?
116impl std::str::FromStr for LocationKey {
117    type Err = Option<ParseIntError>;
118
119    fn from_str(s: &str) -> Result<Self, Self::Err> {
120        let nvn = s.strip_prefix("loc").ok_or(None)?;
121        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
122        let idx: u64 = idx.parse()?;
123        let ver: u64 = ver.parse()?;
124        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
125    }
126}
127
128impl LocationKey {
129    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
130    /// The first location key, used by the simulator as the default external location.
131    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
132
133    /// A key for testing with index 1.
134    #[cfg(test)]
135    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); // `1v255`
136
137    /// A key for testing with index 2.
138    #[cfg(test)]
139    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); // `2v255`
140}
141
142/// This is used within `q!` code in docker and ECS.
143impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
144    type O = LocationKey;
145
146    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
147    where
148        Self: Sized,
149    {
150        let root = get_this_crate();
151        let n = Key::data(&self).as_ffi();
152        (
153            QuoteTokens {
154                prelude: None,
155                expr: Some(quote! {
156                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
157                }),
158            },
159            (),
160        )
161    }
162}
163
164/// A simple enum for the type of a root location.
165#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
166pub enum LocationType {
167    /// A process (single node).
168    Process,
169    /// A cluster (multiple nodes).
170    Cluster,
171    /// An external client.
172    External,
173}
174
175/// A location where data can be materialized and computation can be executed.
176///
177/// Hydro is a **global**, **distributed** programming model. This means that the data
178/// and computation in a Hydro program can be spread across multiple machines, data
179/// centers, and even continents. To achieve this, Hydro uses the concept of
180/// **locations** to keep track of _where_ data is located and computation is executed.
181///
182/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
183/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
184/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
185/// to allow live collections to be _moved_ between locations via network send/receive.
186///
187/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
188#[expect(
189    private_bounds,
190    reason = "only internal Hydro code can define location types"
191)]
192pub trait Location<'a>: dynamic::DynLocation {
193    /// The root location type for this location.
194    ///
195    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
196    /// For nested locations like [`Tick`], this is the root location that contains it.
197    type Root: Location<'a>;
198
199    /// Returns the root location for this location.
200    ///
201    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
202    /// For nested locations like [`Tick`], this returns the root location that contains it.
203    fn root(&self) -> Self::Root;
204
205    /// Attempts to create a new [`Tick`] clock domain at this location.
206    ///
207    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
208    /// or `None` if this location is already inside a tick (nested ticks are not supported).
209    ///
210    /// Prefer using [`Location::tick`] when you know the location is top-level.
211    fn try_tick(&self) -> Option<Tick<Self>> {
212        if Self::is_top_level() {
213            let id = self.flow_state().borrow_mut().next_clock_id();
214            Some(Tick {
215                id,
216                l: self.clone(),
217            })
218        } else {
219            None
220        }
221    }
222
223    /// Returns the unique identifier for this location.
224    fn id(&self) -> LocationId {
225        dynamic::DynLocation::id(self)
226    }
227
228    /// Creates a new [`Tick`] clock domain at this location.
229    ///
230    /// A tick represents a logical clock that can be used to batch streaming data
231    /// into discrete time steps. This is useful for implementing iterative algorithms
232    /// or for synchronizing data across multiple streams.
233    ///
234    /// # Example
235    /// ```rust
236    /// # #[cfg(feature = "deploy")] {
237    /// # use hydro_lang::prelude::*;
238    /// # use futures::StreamExt;
239    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
240    /// let tick = process.tick();
241    /// let inside_tick = process
242    ///     .source_iter(q!(vec![1, 2, 3, 4]))
243    ///     .batch(&tick, nondet!(/** test */));
244    /// inside_tick.all_ticks()
245    /// # }, |mut stream| async move {
246    /// // 1, 2, 3, 4
247    /// # for w in vec![1, 2, 3, 4] {
248    /// #     assert_eq!(stream.next().await.unwrap(), w);
249    /// # }
250    /// # }));
251    /// # }
252    /// ```
253    fn tick(&self) -> Tick<Self>
254    where
255        Self: NoTick,
256    {
257        let id = self.flow_state().borrow_mut().next_clock_id();
258        Tick {
259            id,
260            l: self.clone(),
261        }
262    }
263
264    /// Creates an unbounded stream that continuously emits unit values `()`.
265    ///
266    /// This is useful for driving computations that need to run continuously,
267    /// such as polling or heartbeat mechanisms.
268    ///
269    /// # Example
270    /// ```rust
271    /// # #[cfg(feature = "deploy")] {
272    /// # use hydro_lang::prelude::*;
273    /// # use futures::StreamExt;
274    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
275    /// let tick = process.tick();
276    /// process.spin()
277    ///     .batch(&tick, nondet!(/** test */))
278    ///     .map(q!(|_| 42))
279    ///     .all_ticks()
280    /// # }, |mut stream| async move {
281    /// // 42, 42, 42, ...
282    /// # assert_eq!(stream.next().await.unwrap(), 42);
283    /// # assert_eq!(stream.next().await.unwrap(), 42);
284    /// # assert_eq!(stream.next().await.unwrap(), 42);
285    /// # }));
286    /// # }
287    /// ```
288    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
289    where
290        Self: Sized + NoTick,
291    {
292        Stream::new(
293            self.clone(),
294            HydroNode::Source {
295                source: HydroSource::Spin(),
296                metadata: self.new_node_metadata(Stream::<
297                    (),
298                    Self,
299                    Unbounded,
300                    TotalOrder,
301                    ExactlyOnce,
302                >::collection_kind()),
303            },
304        )
305    }
306
307    /// Creates a stream from an async [`FuturesStream`].
308    ///
309    /// This is useful for integrating with external async data sources,
310    /// such as network connections or file readers.
311    ///
312    /// # Example
313    /// ```rust
314    /// # #[cfg(feature = "deploy")] {
315    /// # use hydro_lang::prelude::*;
316    /// # use futures::StreamExt;
317    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
318    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
319    /// # }, |mut stream| async move {
320    /// // 1, 2, 3
321    /// # for w in vec![1, 2, 3] {
322    /// #     assert_eq!(stream.next().await.unwrap(), w);
323    /// # }
324    /// # }));
325    /// # }
326    /// ```
327    fn source_stream<T, E>(
328        &self,
329        e: impl QuotedWithContext<'a, E, Self>,
330    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
331    where
332        E: FuturesStream<Item = T> + Unpin,
333        Self: Sized + NoTick,
334    {
335        let e = e.splice_untyped_ctx(self);
336
337        Stream::new(
338            self.clone(),
339            HydroNode::Source {
340                source: HydroSource::Stream(e.into()),
341                metadata: self.new_node_metadata(Stream::<
342                    T,
343                    Self,
344                    Unbounded,
345                    TotalOrder,
346                    ExactlyOnce,
347                >::collection_kind()),
348            },
349        )
350    }
351
352    /// Creates a bounded stream from an iterator.
353    ///
354    /// The iterator is evaluated once at runtime, and all elements are emitted
355    /// in order. This is useful for creating streams from static data or
356    /// for testing.
357    ///
358    /// # Example
359    /// ```rust
360    /// # #[cfg(feature = "deploy")] {
361    /// # use hydro_lang::prelude::*;
362    /// # use futures::StreamExt;
363    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
364    /// process.source_iter(q!(vec![1, 2, 3, 4]))
365    /// # }, |mut stream| async move {
366    /// // 1, 2, 3, 4
367    /// # for w in vec![1, 2, 3, 4] {
368    /// #     assert_eq!(stream.next().await.unwrap(), w);
369    /// # }
370    /// # }));
371    /// # }
372    /// ```
373    fn source_iter<T, E>(
374        &self,
375        e: impl QuotedWithContext<'a, E, Self>,
376    ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
377    where
378        E: IntoIterator<Item = T>,
379        Self: Sized,
380    {
381        let e = e.splice_typed_ctx(self);
382
383        Stream::new(
384            self.clone(),
385            HydroNode::Source {
386                source: HydroSource::Iter(e.into()),
387                metadata: self.new_node_metadata(
388                    Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
389                ),
390            },
391        )
392    }
393
394    /// Creates a stream of membership events for a cluster.
395    ///
396    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
397    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
398    /// keyed by the [`MemberId`] of the cluster member.
399    ///
400    /// This is useful for implementing protocols that need to track cluster membership,
401    /// such as broadcasting to all members or detecting failures.
402    ///
403    /// # Example
404    /// ```rust
405    /// # #[cfg(feature = "deploy")] {
406    /// # use hydro_lang::prelude::*;
407    /// # use futures::StreamExt;
408    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
409    /// let p1 = flow.process::<()>();
410    /// let workers: Cluster<()> = flow.cluster::<()>();
411    /// # // do nothing on each worker
412    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
413    /// let cluster_members = p1.source_cluster_members(&workers);
414    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
415    /// // if there are 4 members in the cluster, we would see a join event for each
416    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
417    /// # }, |mut stream| async move {
418    /// # let mut results = Vec::new();
419    /// # for w in 0..4 {
420    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
421    /// # }
422    /// # results.sort();
423    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
424    /// # }));
425    /// # }
426    /// ```
427    fn source_cluster_members<C: 'a>(
428        &self,
429        cluster: &Cluster<'a, C>,
430    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
431    where
432        Self: Sized + NoTick,
433    {
434        Stream::new(
435            self.clone(),
436            HydroNode::Source {
437                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
438                metadata: self.new_node_metadata(Stream::<
439                    (TaglessMemberId, MembershipEvent),
440                    Self,
441                    Unbounded,
442                    TotalOrder,
443                    ExactlyOnce,
444                >::collection_kind()),
445            },
446        )
447        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
448        .into_keyed()
449    }
450
451    /// Creates a one-way connection from an external process to receive raw bytes.
452    ///
453    /// Returns a port handle for the external process to connect to, and a stream
454    /// of received byte buffers.
455    ///
456    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
457    /// or [`Location::source_external_bincode`].
458    fn source_external_bytes<L>(
459        &self,
460        from: &External<L>,
461    ) -> (
462        ExternalBytesPort,
463        Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
464    )
465    where
466        Self: Sized + NoTick,
467    {
468        let (port, stream, sink) =
469            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
470
471        sink.complete(self.source_iter(q!([])));
472
473        (port, stream)
474    }
475
476    /// Creates a one-way connection from an external process to receive bincode-serialized data.
477    ///
478    /// Returns a sink handle for the external process to send data to, and a stream
479    /// of received values.
480    ///
481    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
482    #[expect(clippy::type_complexity, reason = "stream markers")]
483    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
484        &self,
485        from: &External<L>,
486    ) -> (
487        ExternalBincodeSink<T, NotMany, O, R>,
488        Stream<T, Self, Unbounded, O, R>,
489    )
490    where
491        Self: Sized + NoTick,
492        T: Serialize + DeserializeOwned,
493    {
494        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
495        sink.complete(self.source_iter(q!([])));
496
497        (
498            ExternalBincodeSink {
499                process_key: from.key,
500                port_id: port.port_id,
501                _phantom: PhantomData,
502            },
503            stream.weaken_ordering().weaken_retries(),
504        )
505    }
506
507    /// Sets up a simulated input port on this location for testing.
508    ///
509    /// Returns a handle to send messages to the location as well as a stream
510    /// of received messages. This is only available when the `sim` feature is enabled.
511    #[cfg(feature = "sim")]
512    #[expect(clippy::type_complexity, reason = "stream markers")]
513    fn sim_input<T, O: Ordering, R: Retries>(
514        &self,
515    ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
516    where
517        Self: Sized + NoTick,
518        T: Serialize + DeserializeOwned,
519    {
520        let external_location: External<'a, ()> = External {
521            key: LocationKey::FIRST,
522            flow_state: self.flow_state().clone(),
523            _phantom: PhantomData,
524        };
525
526        let (external, stream) = self.source_external_bincode(&external_location);
527
528        (SimSender(external.port_id, PhantomData), stream)
529    }
530
531    /// Creates an external input stream for embedded deployment mode.
532    ///
533    /// The `name` parameter specifies the name of the generated function parameter
534    /// that will supply data to this stream at runtime. The generated function will
535    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
536    fn embedded_input<T>(
537        &self,
538        name: impl Into<String>,
539    ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
540    where
541        Self: Sized + NoTick,
542    {
543        let ident = syn::Ident::new(&name.into(), Span::call_site());
544
545        Stream::new(
546            self.clone(),
547            HydroNode::Source {
548                source: HydroSource::Embedded(ident),
549                metadata: self.new_node_metadata(Stream::<
550                    T,
551                    Self,
552                    Unbounded,
553                    TotalOrder,
554                    ExactlyOnce,
555                >::collection_kind()),
556            },
557        )
558    }
559
560    /// Creates an embedded singleton input for embedded deployment mode.
561    ///
562    /// The `name` parameter specifies the name of the generated function parameter
563    /// that will supply data to this singleton at runtime. The generated function will
564    /// accept a plain `T` parameter with this name.
565    fn embedded_singleton_input<T>(&self, name: impl Into<String>) -> Singleton<T, Self, Bounded>
566    where
567        Self: Sized + NoTick,
568    {
569        let ident = syn::Ident::new(&name.into(), Span::call_site());
570
571        Singleton::new(
572            self.clone(),
573            HydroNode::Source {
574                source: HydroSource::EmbeddedSingleton(ident),
575                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
576            },
577        )
578    }
579
580    /// Establishes a server on this location to receive a bidirectional connection from a single
581    /// client, identified by the given `External` handle. Returns a port handle for the external
582    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
583    /// messages.
584    ///
585    /// # Example
586    /// ```rust
587    /// # #[cfg(feature = "deploy")] {
588    /// # use hydro_lang::prelude::*;
589    /// # use hydro_deploy::Deployment;
590    /// # use futures::{SinkExt, StreamExt};
591    /// # tokio_test::block_on(async {
592    /// # use bytes::Bytes;
593    /// # use hydro_lang::location::NetworkHint;
594    /// # use tokio_util::codec::LengthDelimitedCodec;
595    /// # let mut flow = FlowBuilder::new();
596    /// let node = flow.process::<()>();
597    /// let external = flow.external::<()>();
598    /// let (port, incoming, outgoing) =
599    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
600    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
601    ///     let mut resp: Vec<u8> = data.into();
602    ///     resp.push(42);
603    ///     resp.into() // : Bytes
604    /// })));
605    ///
606    /// # let mut deployment = Deployment::new();
607    /// let nodes = flow // ... with_process and with_external
608    /// #     .with_process(&node, deployment.Localhost())
609    /// #     .with_external(&external, deployment.Localhost())
610    /// #     .deploy(&mut deployment);
611    ///
612    /// deployment.deploy().await.unwrap();
613    /// deployment.start().await.unwrap();
614    ///
615    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
616    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
617    /// assert_eq!(
618    ///     external_out.next().await.unwrap().unwrap(),
619    ///     vec![1, 2, 3, 42]
620    /// );
621    /// # });
622    /// # }
623    /// ```
624    #[expect(clippy::type_complexity, reason = "stream markers")]
625    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
626        &self,
627        from: &External<L>,
628        port_hint: NetworkHint,
629    ) -> (
630        ExternalBytesPort<NotMany>,
631        Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
632        ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
633    )
634    where
635        Self: Sized + NoTick,
636    {
637        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
638
639        let (fwd_ref, to_sink) =
640            self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
641        let mut flow_state_borrow = self.flow_state().borrow_mut();
642
643        flow_state_borrow.push_root(HydroRoot::SendExternal {
644            to_external_key: from.key,
645            to_port_id: next_external_port_id,
646            to_many: false,
647            unpaired: false,
648            serialize_fn: None,
649            instantiate_fn: DebugInstantiate::Building,
650            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
651            op_metadata: HydroIrOpMetadata::new(),
652        });
653
654        let raw_stream: Stream<
655            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
656            Self,
657            Unbounded,
658            TotalOrder,
659            ExactlyOnce,
660        > = Stream::new(
661            self.clone(),
662            HydroNode::ExternalInput {
663                from_external_key: from.key,
664                from_port_id: next_external_port_id,
665                from_many: false,
666                codec_type: quote_type::<Codec>().into(),
667                port_hint,
668                instantiate_fn: DebugInstantiate::Building,
669                deserialize_fn: None,
670                metadata: self.new_node_metadata(Stream::<
671                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
672                    Self,
673                    Unbounded,
674                    TotalOrder,
675                    ExactlyOnce,
676                >::collection_kind()),
677            },
678        );
679
680        (
681            ExternalBytesPort {
682                process_key: from.key,
683                port_id: next_external_port_id,
684                _phantom: PhantomData,
685            },
686            raw_stream.flatten_ordered(),
687            fwd_ref,
688        )
689    }
690
691    /// Establishes a bidirectional connection from a single external client using bincode serialization.
692    ///
693    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
694    /// and a handle to send outgoing messages. This is a convenience wrapper around
695    /// [`Location::bind_single_client`] that uses bincode for serialization.
696    ///
697    /// # Type Parameters
698    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
699    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
700    #[expect(clippy::type_complexity, reason = "stream markers")]
701    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
702        &self,
703        from: &External<L>,
704    ) -> (
705        ExternalBincodeBidi<InT, OutT, NotMany>,
706        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
707        ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
708    )
709    where
710        Self: Sized + NoTick,
711    {
712        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
713
714        let (fwd_ref, to_sink) =
715            self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
716        let mut flow_state_borrow = self.flow_state().borrow_mut();
717
718        let root = get_this_crate();
719
720        let out_t_type = quote_type::<OutT>();
721        let ser_fn: syn::Expr = syn::parse_quote! {
722            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
723                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
724            )
725        };
726
727        flow_state_borrow.push_root(HydroRoot::SendExternal {
728            to_external_key: from.key,
729            to_port_id: next_external_port_id,
730            to_many: false,
731            unpaired: false,
732            serialize_fn: Some(ser_fn.into()),
733            instantiate_fn: DebugInstantiate::Building,
734            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
735            op_metadata: HydroIrOpMetadata::new(),
736        });
737
738        let in_t_type = quote_type::<InT>();
739
740        let deser_fn: syn::Expr = syn::parse_quote! {
741            |res| {
742                let b = res.unwrap();
743                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
744            }
745        };
746
747        let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
748            self.clone(),
749            HydroNode::ExternalInput {
750                from_external_key: from.key,
751                from_port_id: next_external_port_id,
752                from_many: false,
753                codec_type: quote_type::<LengthDelimitedCodec>().into(),
754                port_hint: NetworkHint::Auto,
755                instantiate_fn: DebugInstantiate::Building,
756                deserialize_fn: Some(deser_fn.into()),
757                metadata: self.new_node_metadata(Stream::<
758                    InT,
759                    Self,
760                    Unbounded,
761                    TotalOrder,
762                    ExactlyOnce,
763                >::collection_kind()),
764            },
765        );
766
767        (
768            ExternalBincodeBidi {
769                process_key: from.key,
770                port_id: next_external_port_id,
771                _phantom: PhantomData,
772            },
773            raw_stream,
774            fwd_ref,
775        )
776    }
777
778    /// Establishes a server on this location to receive bidirectional connections from multiple
779    /// external clients using raw bytes.
780    ///
781    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
782    /// connections. Each client is assigned a unique `u64` identifier.
783    ///
784    /// Returns:
785    /// - A port handle for external processes to connect to
786    /// - A keyed stream of incoming messages, keyed by client ID
787    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
788    /// - A handle to send outgoing messages, keyed by client ID
789    #[expect(clippy::type_complexity, reason = "stream markers")]
790    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
791        &self,
792        from: &External<L>,
793        port_hint: NetworkHint,
794    ) -> (
795        ExternalBytesPort<Many>,
796        KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
797        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
798        ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
799    )
800    where
801        Self: Sized + NoTick,
802    {
803        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
804
805        let (fwd_ref, to_sink) =
806            self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
807        let mut flow_state_borrow = self.flow_state().borrow_mut();
808
809        flow_state_borrow.push_root(HydroRoot::SendExternal {
810            to_external_key: from.key,
811            to_port_id: next_external_port_id,
812            to_many: true,
813            unpaired: false,
814            serialize_fn: None,
815            instantiate_fn: DebugInstantiate::Building,
816            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
817            op_metadata: HydroIrOpMetadata::new(),
818        });
819
820        let raw_stream: Stream<
821            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
822            Self,
823            Unbounded,
824            TotalOrder,
825            ExactlyOnce,
826        > = Stream::new(
827            self.clone(),
828            HydroNode::ExternalInput {
829                from_external_key: from.key,
830                from_port_id: next_external_port_id,
831                from_many: true,
832                codec_type: quote_type::<Codec>().into(),
833                port_hint,
834                instantiate_fn: DebugInstantiate::Building,
835                deserialize_fn: None,
836                metadata: self.new_node_metadata(Stream::<
837                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
838                    Self,
839                    Unbounded,
840                    TotalOrder,
841                    ExactlyOnce,
842                >::collection_kind()),
843            },
844        );
845
846        let membership_stream_ident = syn::Ident::new(
847            &format!(
848                "__hydro_deploy_many_{}_{}_membership",
849                from.key, next_external_port_id
850            ),
851            Span::call_site(),
852        );
853        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
854        let raw_membership_stream: KeyedStream<
855            u64,
856            bool,
857            Self,
858            Unbounded,
859            TotalOrder,
860            ExactlyOnce,
861        > = KeyedStream::new(
862            self.clone(),
863            HydroNode::Source {
864                source: HydroSource::Stream(membership_stream_expr.into()),
865                metadata: self.new_node_metadata(KeyedStream::<
866                    u64,
867                    bool,
868                    Self,
869                    Unbounded,
870                    TotalOrder,
871                    ExactlyOnce,
872                >::collection_kind()),
873            },
874        );
875
876        (
877            ExternalBytesPort {
878                process_key: from.key,
879                port_id: next_external_port_id,
880                _phantom: PhantomData,
881            },
882            raw_stream
883                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
884                .into_keyed(),
885            raw_membership_stream.map(q!(|join| {
886                if join {
887                    MembershipEvent::Joined
888                } else {
889                    MembershipEvent::Left
890                }
891            })),
892            fwd_ref,
893        )
894    }
895
896    /// Establishes a server on this location to receive bidirectional connections from multiple
897    /// external clients using bincode serialization.
898    ///
899    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
900    /// client connections. Each client is assigned a unique `u64` identifier.
901    ///
902    /// Returns:
903    /// - A port handle for external processes to connect to
904    /// - A keyed stream of incoming messages, keyed by client ID
905    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
906    /// - A handle to send outgoing messages, keyed by client ID
907    ///
908    /// # Type Parameters
909    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
910    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
911    #[expect(clippy::type_complexity, reason = "stream markers")]
912    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
913        &self,
914        from: &External<L>,
915    ) -> (
916        ExternalBincodeBidi<InT, OutT, Many>,
917        KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
918        KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
919        ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
920    )
921    where
922        Self: Sized + NoTick,
923    {
924        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
925
926        let (fwd_ref, to_sink) =
927            self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
928        let mut flow_state_borrow = self.flow_state().borrow_mut();
929
930        let root = get_this_crate();
931
932        let out_t_type = quote_type::<OutT>();
933        let ser_fn: syn::Expr = syn::parse_quote! {
934            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
935                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
936            )
937        };
938
939        flow_state_borrow.push_root(HydroRoot::SendExternal {
940            to_external_key: from.key,
941            to_port_id: next_external_port_id,
942            to_many: true,
943            unpaired: false,
944            serialize_fn: Some(ser_fn.into()),
945            instantiate_fn: DebugInstantiate::Building,
946            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
947            op_metadata: HydroIrOpMetadata::new(),
948        });
949
950        let in_t_type = quote_type::<InT>();
951
952        let deser_fn: syn::Expr = syn::parse_quote! {
953            |res| {
954                let (id, b) = res.unwrap();
955                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
956            }
957        };
958
959        let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
960            KeyedStream::new(
961                self.clone(),
962                HydroNode::ExternalInput {
963                    from_external_key: from.key,
964                    from_port_id: next_external_port_id,
965                    from_many: true,
966                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
967                    port_hint: NetworkHint::Auto,
968                    instantiate_fn: DebugInstantiate::Building,
969                    deserialize_fn: Some(deser_fn.into()),
970                    metadata: self.new_node_metadata(KeyedStream::<
971                        u64,
972                        InT,
973                        Self,
974                        Unbounded,
975                        TotalOrder,
976                        ExactlyOnce,
977                    >::collection_kind()),
978                },
979            );
980
981        let membership_stream_ident = syn::Ident::new(
982            &format!(
983                "__hydro_deploy_many_{}_{}_membership",
984                from.key, next_external_port_id
985            ),
986            Span::call_site(),
987        );
988        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
989        let raw_membership_stream: KeyedStream<
990            u64,
991            bool,
992            Self,
993            Unbounded,
994            TotalOrder,
995            ExactlyOnce,
996        > = KeyedStream::new(
997            self.clone(),
998            HydroNode::Source {
999                source: HydroSource::Stream(membership_stream_expr.into()),
1000                metadata: self.new_node_metadata(KeyedStream::<
1001                    u64,
1002                    bool,
1003                    Self,
1004                    Unbounded,
1005                    TotalOrder,
1006                    ExactlyOnce,
1007                >::collection_kind()),
1008            },
1009        );
1010
1011        (
1012            ExternalBincodeBidi {
1013                process_key: from.key,
1014                port_id: next_external_port_id,
1015                _phantom: PhantomData,
1016            },
1017            raw_stream,
1018            raw_membership_stream.map(q!(|join| {
1019                if join {
1020                    MembershipEvent::Joined
1021                } else {
1022                    MembershipEvent::Left
1023                }
1024            })),
1025            fwd_ref,
1026        )
1027    }
1028
1029    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1030    ///
1031    /// See also: [`Tick::singleton`], for creating a singleton _within_ a tick, which requires
1032    /// `T: Clone`.
1033    ///
1034    /// # Example
1035    /// ```rust
1036    /// # #[cfg(feature = "deploy")] {
1037    /// # use hydro_lang::prelude::*;
1038    /// # use futures::StreamExt;
1039    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1040    /// let singleton = process.singleton(q!(5));
1041    /// # singleton.into_stream()
1042    /// # }, |mut stream| async move {
1043    /// // 5
1044    /// # assert_eq!(stream.next().await.unwrap(), 5);
1045    /// # }));
1046    /// # }
1047    /// ```
1048    fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1049    where
1050        Self: Sized + NoTick,
1051    {
1052        let e = e.splice_untyped_ctx(self);
1053
1054        Singleton::new(
1055            self.clone(),
1056            HydroNode::SingletonSource {
1057                value: e.into(),
1058                first_tick_only: false,
1059                metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1060            },
1061        )
1062    }
1063
1064    /// Constructs a [`Singleton`] by resolving an async [`Future`] to completion.
1065    ///
1066    /// This is a convenience method equivalent to
1067    /// `self.singleton(future_expr).resolve_future_blocking()`, which is a common
1068    /// pattern when initializing a singleton from an async computation.
1069    ///
1070    /// # Example
1071    /// ```rust
1072    /// # #[cfg(feature = "deploy")] {
1073    /// # use hydro_lang::prelude::*;
1074    /// # use futures::StreamExt;
1075    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1076    /// let singleton = process.singleton_future(q!(async { 42 }));
1077    /// singleton.into_stream()
1078    /// # }, |mut stream| async move {
1079    /// // 42
1080    /// # assert_eq!(stream.next().await.unwrap(), 42);
1081    /// # }));
1082    /// # }
1083    /// ```
1084    ///
1085    /// [`Future`]: std::future::Future
1086    fn singleton_future<F>(
1087        &self,
1088        e: impl QuotedWithContext<'a, F, Self>,
1089    ) -> Singleton<F::Output, Self, Bounded>
1090    where
1091        F: Future,
1092        Self: Sized + NoTick,
1093    {
1094        self.singleton(e).resolve_future_blocking()
1095    }
1096
1097    /// Generates a stream with values emitted at a fixed interval, with
1098    /// each value being the current time (as an [`tokio::time::Instant`]).
1099    ///
1100    /// The clock source used is monotonic, so elements will be emitted in
1101    /// increasing order.
1102    ///
1103    /// # Non-Determinism
1104    /// Because this stream is generated by an OS timer, it will be
1105    /// non-deterministic because each timestamp will be arbitrary.
1106    fn source_interval(
1107        &self,
1108        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1109        _nondet: NonDet,
1110    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1111    where
1112        Self: Sized + NoTick,
1113    {
1114        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1115            tokio::time::interval(interval)
1116        )))
1117    }
1118
1119    /// Generates a stream with values emitted at a fixed interval (with an
1120    /// initial delay), with each value being the current time
1121    /// (as an [`tokio::time::Instant`]).
1122    ///
1123    /// The clock source used is monotonic, so elements will be emitted in
1124    /// increasing order.
1125    ///
1126    /// # Non-Determinism
1127    /// Because this stream is generated by an OS timer, it will be
1128    /// non-deterministic because each timestamp will be arbitrary.
1129    fn source_interval_delayed(
1130        &self,
1131        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1132        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1133        _nondet: NonDet,
1134    ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1135    where
1136        Self: Sized + NoTick,
1137    {
1138        self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1139            tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1140        )))
1141    }
1142
1143    /// Creates a forward reference for defining recursive or mutually-dependent dataflows.
1144    ///
1145    /// Returns a handle that must be completed with the actual stream, and a placeholder
1146    /// stream that can be used in the dataflow graph before the actual stream is defined.
1147    ///
1148    /// This is useful for implementing feedback loops or recursive computations where
1149    /// a stream depends on its own output.
1150    ///
1151    /// # Example
1152    /// ```rust
1153    /// # #[cfg(feature = "deploy")] {
1154    /// # use hydro_lang::prelude::*;
1155    /// # use hydro_lang::live_collections::stream::NoOrder;
1156    /// # use futures::StreamExt;
1157    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1158    /// // Create a forward reference for the feedback stream
1159    /// let (complete, feedback) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1160    ///
1161    /// // Combine initial input with feedback, then increment
1162    /// let input: Stream<_, _, Unbounded> = process.source_iter(q!([1])).into();
1163    /// let output: Stream<_, _, _, NoOrder> = input.merge_unordered(feedback).map(q!(|x| x + 1));
1164    ///
1165    /// // Complete the forward reference with the output
1166    /// complete.complete(output.clone());
1167    /// output
1168    /// # }, |mut stream| async move {
1169    /// // 2, 3, 4, 5, ...
1170    /// # assert_eq!(stream.next().await.unwrap(), 2);
1171    /// # assert_eq!(stream.next().await.unwrap(), 3);
1172    /// # assert_eq!(stream.next().await.unwrap(), 4);
1173    /// # }));
1174    /// # }
1175    /// ```
1176    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1177    where
1178        S: CycleCollection<'a, ForwardRef, Location = Self>,
1179    {
1180        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1181        (
1182            ForwardHandle::new(cycle_id, Location::id(self)),
1183            S::create_source(cycle_id, self.clone()),
1184        )
1185    }
1186}
1187
1188#[cfg(feature = "deploy")]
1189#[cfg(test)]
1190mod tests {
1191    use std::collections::HashSet;
1192
1193    use futures::{SinkExt, StreamExt};
1194    use hydro_deploy::Deployment;
1195    use stageleft::q;
1196    use tokio_util::codec::LengthDelimitedCodec;
1197
1198    use crate::compile::builder::FlowBuilder;
1199    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1200    use crate::location::{Location, NetworkHint};
1201    use crate::nondet::nondet;
1202
1203    #[tokio::test]
1204    async fn top_level_singleton_replay_cardinality() {
1205        let mut deployment = Deployment::new();
1206
1207        let mut flow = FlowBuilder::new();
1208        let node = flow.process::<()>();
1209        let external = flow.external::<()>();
1210
1211        let (in_port, input) =
1212            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1213        let singleton = node.singleton(q!(123));
1214        let tick = node.tick();
1215        let out = input
1216            .batch(&tick, nondet!(/** test */))
1217            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1218            .cross_singleton(
1219                singleton
1220                    .snapshot(&tick, nondet!(/** test */))
1221                    .into_stream()
1222                    .count(),
1223            )
1224            .all_ticks()
1225            .send_bincode_external(&external);
1226
1227        let nodes = flow
1228            .with_process(&node, deployment.Localhost())
1229            .with_external(&external, deployment.Localhost())
1230            .deploy(&mut deployment);
1231
1232        deployment.deploy().await.unwrap();
1233
1234        let mut external_in = nodes.connect(in_port).await;
1235        let mut external_out = nodes.connect(out).await;
1236
1237        deployment.start().await.unwrap();
1238
1239        external_in.send(1).await.unwrap();
1240        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1241
1242        external_in.send(2).await.unwrap();
1243        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1244    }
1245
1246    #[tokio::test]
1247    async fn tick_singleton_replay_cardinality() {
1248        let mut deployment = Deployment::new();
1249
1250        let mut flow = FlowBuilder::new();
1251        let node = flow.process::<()>();
1252        let external = flow.external::<()>();
1253
1254        let (in_port, input) =
1255            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1256        let tick = node.tick();
1257        let singleton = tick.singleton(q!(123));
1258        let out = input
1259            .batch(&tick, nondet!(/** test */))
1260            .cross_singleton(singleton.clone())
1261            .cross_singleton(singleton.into_stream().count())
1262            .all_ticks()
1263            .send_bincode_external(&external);
1264
1265        let nodes = flow
1266            .with_process(&node, deployment.Localhost())
1267            .with_external(&external, deployment.Localhost())
1268            .deploy(&mut deployment);
1269
1270        deployment.deploy().await.unwrap();
1271
1272        let mut external_in = nodes.connect(in_port).await;
1273        let mut external_out = nodes.connect(out).await;
1274
1275        deployment.start().await.unwrap();
1276
1277        external_in.send(1).await.unwrap();
1278        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1279
1280        external_in.send(2).await.unwrap();
1281        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1282    }
1283
1284    #[tokio::test]
1285    async fn external_bytes() {
1286        let mut deployment = Deployment::new();
1287
1288        let mut flow = FlowBuilder::new();
1289        let first_node = flow.process::<()>();
1290        let external = flow.external::<()>();
1291
1292        let (in_port, input) = first_node.source_external_bytes(&external);
1293        let out = input.send_bincode_external(&external);
1294
1295        let nodes = flow
1296            .with_process(&first_node, deployment.Localhost())
1297            .with_external(&external, deployment.Localhost())
1298            .deploy(&mut deployment);
1299
1300        deployment.deploy().await.unwrap();
1301
1302        let mut external_in = nodes.connect(in_port).await.1;
1303        let mut external_out = nodes.connect(out).await;
1304
1305        deployment.start().await.unwrap();
1306
1307        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1308
1309        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1310    }
1311
1312    #[tokio::test]
1313    async fn multi_external_source() {
1314        let mut deployment = Deployment::new();
1315
1316        let mut flow = FlowBuilder::new();
1317        let first_node = flow.process::<()>();
1318        let external = flow.external::<()>();
1319
1320        let (in_port, input, _membership, complete_sink) =
1321            first_node.bidi_external_many_bincode(&external);
1322        let out = input.entries().send_bincode_external(&external);
1323        complete_sink.complete(
1324            first_node
1325                .source_iter::<(u64, ()), _>(q!([]))
1326                .into_keyed()
1327                .weaken_ordering(),
1328        );
1329
1330        let nodes = flow
1331            .with_process(&first_node, deployment.Localhost())
1332            .with_external(&external, deployment.Localhost())
1333            .deploy(&mut deployment);
1334
1335        deployment.deploy().await.unwrap();
1336
1337        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1338        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1339        let external_out = nodes.connect(out).await;
1340
1341        deployment.start().await.unwrap();
1342
1343        external_in_1.send(123).await.unwrap();
1344        external_in_2.send(456).await.unwrap();
1345
1346        assert_eq!(
1347            external_out.take(2).collect::<HashSet<_>>().await,
1348            vec![(0, 123), (1, 456)].into_iter().collect()
1349        );
1350    }
1351
1352    #[tokio::test]
1353    async fn second_connection_only_multi_source() {
1354        let mut deployment = Deployment::new();
1355
1356        let mut flow = FlowBuilder::new();
1357        let first_node = flow.process::<()>();
1358        let external = flow.external::<()>();
1359
1360        let (in_port, input, _membership, complete_sink) =
1361            first_node.bidi_external_many_bincode(&external);
1362        let out = input.entries().send_bincode_external(&external);
1363        complete_sink.complete(
1364            first_node
1365                .source_iter::<(u64, ()), _>(q!([]))
1366                .into_keyed()
1367                .weaken_ordering(),
1368        );
1369
1370        let nodes = flow
1371            .with_process(&first_node, deployment.Localhost())
1372            .with_external(&external, deployment.Localhost())
1373            .deploy(&mut deployment);
1374
1375        deployment.deploy().await.unwrap();
1376
1377        // intentionally skipped to test stream waking logic
1378        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1379        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1380        let mut external_out = nodes.connect(out).await;
1381
1382        deployment.start().await.unwrap();
1383
1384        external_in_2.send(456).await.unwrap();
1385
1386        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1387    }
1388
1389    #[tokio::test]
1390    async fn multi_external_bytes() {
1391        let mut deployment = Deployment::new();
1392
1393        let mut flow = FlowBuilder::new();
1394        let first_node = flow.process::<()>();
1395        let external = flow.external::<()>();
1396
1397        let (in_port, input, _membership, complete_sink) = first_node
1398            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1399        let out = input.entries().send_bincode_external(&external);
1400        complete_sink.complete(
1401            first_node
1402                .source_iter(q!([]))
1403                .into_keyed()
1404                .weaken_ordering(),
1405        );
1406
1407        let nodes = flow
1408            .with_process(&first_node, deployment.Localhost())
1409            .with_external(&external, deployment.Localhost())
1410            .deploy(&mut deployment);
1411
1412        deployment.deploy().await.unwrap();
1413
1414        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1415        let mut external_in_2 = nodes.connect(in_port).await.1;
1416        let external_out = nodes.connect(out).await;
1417
1418        deployment.start().await.unwrap();
1419
1420        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1421        external_in_2.send(vec![4, 5].into()).await.unwrap();
1422
1423        assert_eq!(
1424            external_out.take(2).collect::<HashSet<_>>().await,
1425            vec![
1426                (0, (&[1u8, 2, 3] as &[u8]).into()),
1427                (1, (&[4u8, 5] as &[u8]).into())
1428            ]
1429            .into_iter()
1430            .collect()
1431        );
1432    }
1433
1434    #[tokio::test]
1435    async fn single_client_external_bytes() {
1436        let mut deployment = Deployment::new();
1437        let mut flow = FlowBuilder::new();
1438        let first_node = flow.process::<()>();
1439        let external = flow.external::<()>();
1440        let (port, input, complete_sink) = first_node
1441            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1442        complete_sink.complete(input.map(q!(|data| {
1443            let mut resp: Vec<u8> = data.into();
1444            resp.push(42);
1445            resp.into() // : Bytes
1446        })));
1447
1448        let nodes = flow
1449            .with_process(&first_node, deployment.Localhost())
1450            .with_external(&external, deployment.Localhost())
1451            .deploy(&mut deployment);
1452
1453        deployment.deploy().await.unwrap();
1454        deployment.start().await.unwrap();
1455
1456        let (mut external_out, mut external_in) = nodes.connect(port).await;
1457
1458        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1459        assert_eq!(
1460            external_out.next().await.unwrap().unwrap(),
1461            vec![1, 2, 3, 42]
1462        );
1463    }
1464
1465    #[tokio::test]
1466    async fn echo_external_bytes() {
1467        let mut deployment = Deployment::new();
1468
1469        let mut flow = FlowBuilder::new();
1470        let first_node = flow.process::<()>();
1471        let external = flow.external::<()>();
1472
1473        let (port, input, _membership, complete_sink) = first_node
1474            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1475        complete_sink
1476            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1477
1478        let nodes = flow
1479            .with_process(&first_node, deployment.Localhost())
1480            .with_external(&external, deployment.Localhost())
1481            .deploy(&mut deployment);
1482
1483        deployment.deploy().await.unwrap();
1484
1485        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1486        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1487
1488        deployment.start().await.unwrap();
1489
1490        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1491        external_in_2.send(vec![4, 5].into()).await.unwrap();
1492
1493        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1494        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1495    }
1496
1497    #[tokio::test]
1498    async fn echo_external_bincode() {
1499        let mut deployment = Deployment::new();
1500
1501        let mut flow = FlowBuilder::new();
1502        let first_node = flow.process::<()>();
1503        let external = flow.external::<()>();
1504
1505        let (port, input, _membership, complete_sink) =
1506            first_node.bidi_external_many_bincode(&external);
1507        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1508
1509        let nodes = flow
1510            .with_process(&first_node, deployment.Localhost())
1511            .with_external(&external, deployment.Localhost())
1512            .deploy(&mut deployment);
1513
1514        deployment.deploy().await.unwrap();
1515
1516        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1517        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1518
1519        deployment.start().await.unwrap();
1520
1521        external_in_1.send("hi".to_owned()).await.unwrap();
1522        external_in_2.send("hello".to_owned()).await.unwrap();
1523
1524        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1525        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1526    }
1527
1528    #[tokio::test]
1529    async fn closure_location_name() {
1530        let mut deployment = Deployment::new();
1531        let mut flow = FlowBuilder::new();
1532
1533        enum ClosureProcess {}
1534
1535        let node = flow.process::<ClosureProcess>();
1536        let external = flow.external::<()>();
1537
1538        let (in_port, input) =
1539            node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1540        let out = input.send_bincode_external(&external);
1541
1542        let nodes = flow
1543            .with_process(&node, deployment.Localhost())
1544            .with_external(&external, deployment.Localhost())
1545            .deploy(&mut deployment);
1546
1547        deployment.deploy().await.unwrap();
1548
1549        let mut external_in = nodes.connect(in_port).await;
1550        let mut external_out = nodes.connect(out).await;
1551
1552        deployment.start().await.unwrap();
1553
1554        external_in.send(42).await.unwrap();
1555        assert_eq!(external_out.next().await.unwrap(), 42);
1556    }
1557}