Skip to main content

hydro_lang/location/
cluster.rs

1//! Definitions for clusters, which represent a group of identical processes.
2//!
3//! A [`Cluster`] is a multi-node location in the Hydro distributed programming model.
4//! Unlike a [`super::Process`], which maps to a single machine, a cluster represents
5//! a dynamically-sized set of machines that all run the same code. Each member of the
6//! cluster is assigned a unique [`super::MemberId`] that can be used to address it.
7//!
8//! Clusters are useful for parallelism, replication, and sharding patterns. Data can
9//! be broadcast to all members, sent to a specific member by ID, or scattered across
10//! members.
11
12use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::LocationKey;
24use crate::location::member_id::TaglessMemberId;
25use crate::staging_util::{Invariant, get_this_crate};
26
27/// A multi-node location representing a group of identical processes.
28///
29/// Each member of the cluster runs the same dataflow program and is assigned a
30/// unique [`MemberId`] that can be used to address it. The number of members
31/// is determined at deployment time rather than at compile time.
32///
33/// The `ClusterTag` type parameter is a phantom tag used to distinguish between
34/// different clusters in the type system, preventing accidental mixing of
35/// member IDs across clusters.
36pub struct Cluster<'a, ClusterTag> {
37    pub(crate) key: LocationKey,
38    pub(crate) flow_state: FlowState,
39    pub(crate) _phantom: Invariant<'a, ClusterTag>,
40}
41
42impl<C> Debug for Cluster<'_, C> {
43    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44        write!(f, "Cluster({})", self.key)
45    }
46}
47
48impl<C> Eq for Cluster<'_, C> {}
49impl<C> PartialEq for Cluster<'_, C> {
50    fn eq(&self, other: &Self) -> bool {
51        self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
52    }
53}
54
55impl<C> Clone for Cluster<'_, C> {
56    fn clone(&self) -> Self {
57        Cluster {
58            key: self.key,
59            flow_state: self.flow_state.clone(),
60            _phantom: PhantomData,
61        }
62    }
63}
64
65impl<'a, C> super::dynamic::DynLocation for Cluster<'a, C> {
66    fn id(&self) -> LocationId {
67        LocationId::Cluster(self.key)
68    }
69
70    fn flow_state(&self) -> &FlowState {
71        &self.flow_state
72    }
73
74    fn is_top_level() -> bool {
75        true
76    }
77
78    fn multiversioned(&self) -> bool {
79        false // TODO(shadaj): enable multiversioning support for clusters
80    }
81}
82
83impl<'a, C> Location<'a> for Cluster<'a, C> {
84    type Root = Cluster<'a, C>;
85
86    fn root(&self) -> Self::Root {
87        self.clone()
88    }
89}
90
91#[cfg(feature = "sim")]
92impl<'a, C> Cluster<'a, C> {
93    /// Sets up a simulated input port on this cluster for testing.
94    ///
95    /// Returns a `SimClusterSender` that sends `(member_id, T)` messages targeting
96    /// specific cluster members, and a `Stream<T>` received by each member.
97    #[expect(clippy::type_complexity, reason = "stream markers")]
98    pub fn sim_input<T>(
99        &self,
100    ) -> (
101        crate::sim::SimClusterSender<
102            T,
103            crate::live_collections::stream::TotalOrder,
104            crate::live_collections::stream::ExactlyOnce,
105        >,
106        crate::live_collections::Stream<
107            T,
108            Self,
109            crate::live_collections::boundedness::Unbounded,
110            crate::live_collections::stream::TotalOrder,
111            crate::live_collections::stream::ExactlyOnce,
112        >,
113    )
114    where
115        T: serde::Serialize + serde::de::DeserializeOwned,
116    {
117        use crate::location::Location;
118
119        let external_location: crate::location::External<'a, ()> = crate::location::External {
120            key: LocationKey::FIRST,
121            flow_state: self.flow_state.clone(),
122            _phantom: PhantomData,
123        };
124
125        let (external, stream) = self.source_external_bincode(&external_location);
126
127        (
128            crate::sim::SimClusterSender(external.port_id, PhantomData),
129            stream,
130        )
131    }
132}
133
134/// A free variable that resolves to the list of member IDs in a cluster at runtime.
135///
136/// When spliced into a quoted snippet, this provides access to the set of
137/// [`TaglessMemberId`]s that belong to the cluster.
138pub struct ClusterIds<'a> {
139    /// The location key identifying which cluster this refers to.
140    pub key: LocationKey,
141    /// Phantom data binding the lifetime.
142    pub _phantom: PhantomData<&'a ()>,
143}
144
145impl<'a> Clone for ClusterIds<'a> {
146    fn clone(&self) -> Self {
147        Self {
148            key: self.key,
149            _phantom: Default::default(),
150        }
151    }
152}
153
154impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
155    type O = &'a [TaglessMemberId];
156
157    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
158    where
159        Self: Sized,
160    {
161        let ident = syn::Ident::new(
162            &format!("__hydro_lang_cluster_ids_{}", self.key),
163            Span::call_site(),
164        );
165
166        (
167            QuoteTokens {
168                prelude: None,
169                expr: Some(quote! { #ident }),
170            },
171            (),
172        )
173    }
174}
175
176impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
177
178/// Marker trait implemented by [`Cluster`] locations, providing access to the cluster tag type.
179pub trait IsCluster {
180    /// The phantom tag type that distinguishes this cluster from others.
181    type Tag;
182}
183
184impl<C> IsCluster for Cluster<'_, C> {
185    type Tag = C;
186}
187
188/// A free variable representing the cluster's own ID. When spliced in
189/// a quoted snippet that will run on a cluster, this turns into a [`MemberId`].
190pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
191
192/// The concrete type behind [`CLUSTER_SELF_ID`].
193///
194/// This is a compile-time variable that, when spliced into a quoted snippet running
195/// on a [`Cluster`], resolves to the [`MemberId`] of the current cluster member.
196#[derive(Clone, Copy)]
197pub struct ClusterSelfId<'a> {
198    _private: &'a (),
199}
200
201impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
202where
203    L: Location<'a>,
204    <L as Location<'a>>::Root: IsCluster,
205{
206    type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
207
208    fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
209    where
210        Self: Sized,
211    {
212        let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
213            id
214        } else {
215            unreachable!()
216        };
217
218        let ident = syn::Ident::new(
219            &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
220            Span::call_site(),
221        );
222        let root = get_this_crate();
223        let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
224
225        (
226            QuoteTokens {
227                prelude: None,
228                expr: Some(
229                    quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
230                ),
231            },
232            (),
233        )
234    }
235}
236
237impl<'a, L>
238    QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
239    for ClusterSelfId<'a>
240where
241    L: Location<'a>,
242    <L as Location<'a>>::Root: IsCluster,
243{
244}
245
246#[cfg(test)]
247mod tests {
248    #[cfg(feature = "sim")]
249    use stageleft::q;
250
251    #[cfg(feature = "sim")]
252    use super::CLUSTER_SELF_ID;
253    #[cfg(feature = "sim")]
254    use crate::location::{Location, MemberId, MembershipEvent};
255    #[cfg(feature = "sim")]
256    use crate::networking::TCP;
257    #[cfg(feature = "sim")]
258    use crate::nondet::nondet;
259    #[cfg(feature = "sim")]
260    use crate::prelude::FlowBuilder;
261
262    #[cfg(feature = "sim")]
263    #[test]
264    fn sim_cluster_self_id() {
265        let mut flow = FlowBuilder::new();
266        let cluster1 = flow.cluster::<()>();
267        let cluster2 = flow.cluster::<()>();
268
269        let node = flow.process::<()>();
270
271        let out_recv = cluster1
272            .source_iter(q!(vec![CLUSTER_SELF_ID]))
273            .send(&node, TCP.fail_stop().bincode())
274            .values()
275            .merge_unordered(
276                cluster2
277                    .source_iter(q!(vec![CLUSTER_SELF_ID]))
278                    .send(&node, TCP.fail_stop().bincode())
279                    .values(),
280            )
281            .sim_output();
282
283        flow.sim()
284            .with_cluster_size(&cluster1, 3)
285            .with_cluster_size(&cluster2, 4)
286            .exhaustive(async || {
287                out_recv
288                    .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
289                    .await
290            });
291    }
292
293    #[cfg(feature = "sim")]
294    #[test]
295    fn sim_cluster_with_tick() {
296        use std::collections::HashMap;
297
298        let mut flow = FlowBuilder::new();
299        let cluster = flow.cluster::<()>();
300        let node = flow.process::<()>();
301
302        let out_recv = cluster
303            .source_iter(q!(vec![1, 2, 3]))
304            .batch(&cluster.tick(), nondet!(/** test */))
305            .count()
306            .all_ticks()
307            .send(&node, TCP.fail_stop().bincode())
308            .entries()
309            .map(q!(|(id, v)| (id, v)))
310            .sim_output();
311
312        let count = flow
313            .sim()
314            .with_cluster_size(&cluster, 2)
315            .exhaustive(async || {
316                let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
317                    HashMap::new(),
318                    |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
319                        *acc.entry(id).or_default() += v;
320                        acc
321                    },
322                );
323
324                assert!(grouped.len() == 2);
325                for (_id, v) in grouped {
326                    assert!(v == 3);
327                }
328            });
329
330        assert_eq!(count, 106);
331        // not a square because we simulate all interleavings of ticks across 2 cluster members
332        // eventually, we should be able to identify that the members are independent (because
333        // there are no dataflow cycles) and avoid simulating redundant interleavings
334    }
335
336    #[cfg(feature = "sim")]
337    #[test]
338    fn sim_cluster_membership() {
339        let mut flow = FlowBuilder::new();
340        let cluster = flow.cluster::<()>();
341        let node = flow.process::<()>();
342
343        let out_recv = node
344            .source_cluster_members(&cluster)
345            .entries()
346            .map(q!(|(id, v)| (id, v)))
347            .sim_output();
348
349        flow.sim()
350            .with_cluster_size(&cluster, 2)
351            .exhaustive(async || {
352                out_recv
353                    .assert_yields_only_unordered(vec![
354                        (MemberId::from_raw_id(0), MembershipEvent::Joined),
355                        (MemberId::from_raw_id(1), MembershipEvent::Joined),
356                    ])
357                    .await;
358            });
359    }
360}