1use std::fmt::{Debug, Formatter};
13use std::marker::PhantomData;
14
15use proc_macro2::Span;
16use quote::quote;
17use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
18use stageleft::{QuotedWithContextWithProps, quote_type};
19
20use super::dynamic::LocationId;
21use super::{Location, MemberId};
22use crate::compile::builder::FlowState;
23use crate::location::LocationKey;
24use crate::location::member_id::TaglessMemberId;
25use crate::staging_util::{Invariant, get_this_crate};
26
27pub struct Cluster<'a, ClusterTag> {
37 pub(crate) key: LocationKey,
38 pub(crate) flow_state: FlowState,
39 pub(crate) _phantom: Invariant<'a, ClusterTag>,
40}
41
42impl<C> Debug for Cluster<'_, C> {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 write!(f, "Cluster({})", self.key)
45 }
46}
47
48impl<C> Eq for Cluster<'_, C> {}
49impl<C> PartialEq for Cluster<'_, C> {
50 fn eq(&self, other: &Self) -> bool {
51 self.key == other.key && FlowState::ptr_eq(&self.flow_state, &other.flow_state)
52 }
53}
54
55impl<C> Clone for Cluster<'_, C> {
56 fn clone(&self) -> Self {
57 Cluster {
58 key: self.key,
59 flow_state: self.flow_state.clone(),
60 _phantom: PhantomData,
61 }
62 }
63}
64
65impl<'a, C> super::dynamic::DynLocation for Cluster<'a, C> {
66 fn id(&self) -> LocationId {
67 LocationId::Cluster(self.key)
68 }
69
70 fn flow_state(&self) -> &FlowState {
71 &self.flow_state
72 }
73
74 fn is_top_level() -> bool {
75 true
76 }
77
78 fn multiversioned(&self) -> bool {
79 false }
81}
82
83impl<'a, C> Location<'a> for Cluster<'a, C> {
84 type Root = Cluster<'a, C>;
85
86 fn root(&self) -> Self::Root {
87 self.clone()
88 }
89}
90
91#[cfg(feature = "sim")]
92impl<'a, C> Cluster<'a, C> {
93 #[expect(clippy::type_complexity, reason = "stream markers")]
98 pub fn sim_input<T>(
99 &self,
100 ) -> (
101 crate::sim::SimClusterSender<
102 T,
103 crate::live_collections::stream::TotalOrder,
104 crate::live_collections::stream::ExactlyOnce,
105 >,
106 crate::live_collections::Stream<
107 T,
108 Self,
109 crate::live_collections::boundedness::Unbounded,
110 crate::live_collections::stream::TotalOrder,
111 crate::live_collections::stream::ExactlyOnce,
112 >,
113 )
114 where
115 T: serde::Serialize + serde::de::DeserializeOwned,
116 {
117 use crate::location::Location;
118
119 let external_location: crate::location::External<'a, ()> = crate::location::External {
120 key: LocationKey::FIRST,
121 flow_state: self.flow_state.clone(),
122 _phantom: PhantomData,
123 };
124
125 let (external, stream) = self.source_external_bincode(&external_location);
126
127 (
128 crate::sim::SimClusterSender(external.port_id, PhantomData),
129 stream,
130 )
131 }
132}
133
134pub struct ClusterIds<'a> {
139 pub key: LocationKey,
141 pub _phantom: PhantomData<&'a ()>,
143}
144
145impl<'a> Clone for ClusterIds<'a> {
146 fn clone(&self) -> Self {
147 Self {
148 key: self.key,
149 _phantom: Default::default(),
150 }
151 }
152}
153
154impl<'a, Ctx> FreeVariableWithContextWithProps<Ctx, ()> for ClusterIds<'a> {
155 type O = &'a [TaglessMemberId];
156
157 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
158 where
159 Self: Sized,
160 {
161 let ident = syn::Ident::new(
162 &format!("__hydro_lang_cluster_ids_{}", self.key),
163 Span::call_site(),
164 );
165
166 (
167 QuoteTokens {
168 prelude: None,
169 expr: Some(quote! { #ident }),
170 },
171 (),
172 )
173 }
174}
175
176impl<'a, Ctx> QuotedWithContextWithProps<'a, &'a [TaglessMemberId], Ctx, ()> for ClusterIds<'a> {}
177
178pub trait IsCluster {
180 type Tag;
182}
183
184impl<C> IsCluster for Cluster<'_, C> {
185 type Tag = C;
186}
187
188pub static CLUSTER_SELF_ID: ClusterSelfId = ClusterSelfId { _private: &() };
191
192#[derive(Clone, Copy)]
197pub struct ClusterSelfId<'a> {
198 _private: &'a (),
199}
200
201impl<'a, L> FreeVariableWithContextWithProps<L, ()> for ClusterSelfId<'a>
202where
203 L: Location<'a>,
204 <L as Location<'a>>::Root: IsCluster,
205{
206 type O = MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>;
207
208 fn to_tokens(self, ctx: &L) -> (QuoteTokens, ())
209 where
210 Self: Sized,
211 {
212 let cluster_id = if let LocationId::Cluster(id) = ctx.root().id() {
213 id
214 } else {
215 unreachable!()
216 };
217
218 let ident = syn::Ident::new(
219 &format!("__hydro_lang_cluster_self_id_{}", cluster_id),
220 Span::call_site(),
221 );
222 let root = get_this_crate();
223 let c_type: syn::Type = quote_type::<<<L as Location<'a>>::Root as IsCluster>::Tag>();
224
225 (
226 QuoteTokens {
227 prelude: None,
228 expr: Some(
229 quote! { #root::__staged::location::MemberId::<#c_type>::from_tagless((#ident).clone()) },
230 ),
231 },
232 (),
233 )
234 }
235}
236
237impl<'a, L>
238 QuotedWithContextWithProps<'a, MemberId<<<L as Location<'a>>::Root as IsCluster>::Tag>, L, ()>
239 for ClusterSelfId<'a>
240where
241 L: Location<'a>,
242 <L as Location<'a>>::Root: IsCluster,
243{
244}
245
246#[cfg(test)]
247mod tests {
248 #[cfg(feature = "sim")]
249 use stageleft::q;
250
251 #[cfg(feature = "sim")]
252 use super::CLUSTER_SELF_ID;
253 #[cfg(feature = "sim")]
254 use crate::location::{Location, MemberId, MembershipEvent};
255 #[cfg(feature = "sim")]
256 use crate::networking::TCP;
257 #[cfg(feature = "sim")]
258 use crate::nondet::nondet;
259 #[cfg(feature = "sim")]
260 use crate::prelude::FlowBuilder;
261
262 #[cfg(feature = "sim")]
263 #[test]
264 fn sim_cluster_self_id() {
265 let mut flow = FlowBuilder::new();
266 let cluster1 = flow.cluster::<()>();
267 let cluster2 = flow.cluster::<()>();
268
269 let node = flow.process::<()>();
270
271 let out_recv = cluster1
272 .source_iter(q!(vec![CLUSTER_SELF_ID]))
273 .send(&node, TCP.fail_stop().bincode())
274 .values()
275 .merge_unordered(
276 cluster2
277 .source_iter(q!(vec![CLUSTER_SELF_ID]))
278 .send(&node, TCP.fail_stop().bincode())
279 .values(),
280 )
281 .sim_output();
282
283 flow.sim()
284 .with_cluster_size(&cluster1, 3)
285 .with_cluster_size(&cluster2, 4)
286 .exhaustive(async || {
287 out_recv
288 .assert_yields_only_unordered([0, 1, 2, 0, 1, 2, 3].map(MemberId::from_raw_id))
289 .await
290 });
291 }
292
293 #[cfg(feature = "sim")]
294 #[test]
295 fn sim_cluster_with_tick() {
296 use std::collections::HashMap;
297
298 let mut flow = FlowBuilder::new();
299 let cluster = flow.cluster::<()>();
300 let node = flow.process::<()>();
301
302 let out_recv = cluster
303 .source_iter(q!(vec![1, 2, 3]))
304 .batch(&cluster.tick(), nondet!())
305 .count()
306 .all_ticks()
307 .send(&node, TCP.fail_stop().bincode())
308 .entries()
309 .map(q!(|(id, v)| (id, v)))
310 .sim_output();
311
312 let count = flow
313 .sim()
314 .with_cluster_size(&cluster, 2)
315 .exhaustive(async || {
316 let grouped = out_recv.collect_sorted::<Vec<_>>().await.into_iter().fold(
317 HashMap::new(),
318 |mut acc: HashMap<MemberId<()>, usize>, (id, v)| {
319 *acc.entry(id).or_default() += v;
320 acc
321 },
322 );
323
324 assert!(grouped.len() == 2);
325 for (_id, v) in grouped {
326 assert!(v == 3);
327 }
328 });
329
330 assert_eq!(count, 106);
331 }
335
336 #[cfg(feature = "sim")]
337 #[test]
338 fn sim_cluster_membership() {
339 let mut flow = FlowBuilder::new();
340 let cluster = flow.cluster::<()>();
341 let node = flow.process::<()>();
342
343 let out_recv = node
344 .source_cluster_members(&cluster)
345 .entries()
346 .map(q!(|(id, v)| (id, v)))
347 .sim_output();
348
349 flow.sim()
350 .with_cluster_size(&cluster, 2)
351 .exhaustive(async || {
352 out_recv
353 .assert_yields_only_unordered(vec![
354 (MemberId::from_raw_id(0), MembershipEvent::Joined),
355 (MemberId::from_raw_id(1), MembershipEvent::Joined),
356 ])
357 .await;
358 });
359 }
360}