1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl From<syn::Expr> for DebugExpr {
44 fn from(expr: syn::Expr) -> Self {
45 Self(Box::new(expr))
46 }
47}
48
49impl Deref for DebugExpr {
50 type Target = syn::Expr;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57impl ToTokens for DebugExpr {
58 fn to_tokens(&self, tokens: &mut TokenStream) {
59 self.0.to_tokens(tokens);
60 }
61}
62
63impl Debug for DebugExpr {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", self.0.to_token_stream())
66 }
67}
68
69impl Display for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let original = self.0.as_ref().clone();
72 let simplified = simplify_q_macro(original);
73
74 write!(f, "q!({})", quote::quote!(#simplified))
77 }
78}
79
80fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
82 let mut simplifier = QMacroSimplifier::new();
85 simplifier.visit_expr_mut(&mut expr);
86
87 if let Some(simplified) = simplifier.simplified_result {
89 simplified
90 } else {
91 expr
92 }
93}
94
95#[derive(Default)]
97pub struct QMacroSimplifier {
98 pub simplified_result: Option<syn::Expr>,
99}
100
101impl QMacroSimplifier {
102 pub fn new() -> Self {
103 Self::default()
104 }
105}
106
107impl VisitMut for QMacroSimplifier {
108 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
109 if self.simplified_result.is_some() {
111 return;
112 }
113
114 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
115 && self.is_stageleft_runtime_support_call(&path_expr.path)
117 && let Some(closure) = self.extract_closure_from_args(&call.args)
119 {
120 self.simplified_result = Some(closure);
121 return;
122 }
123
124 syn::visit_mut::visit_expr_mut(self, expr);
127 }
128}
129
130impl QMacroSimplifier {
131 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
132 if let Some(last_segment) = path.segments.last() {
134 let fn_name = last_segment.ident.to_string();
135 fn_name.contains("_type_hint")
137 && path.segments.len() > 2
138 && path.segments[0].ident == "stageleft"
139 && path.segments[1].ident == "runtime_support"
140 } else {
141 false
142 }
143 }
144
145 fn extract_closure_from_args(
146 &self,
147 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
148 ) -> Option<syn::Expr> {
149 for arg in args {
151 if let syn::Expr::Closure(_) = arg {
152 return Some(arg.clone());
153 }
154 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
156 return Some(closure_expr);
157 }
158 }
159 None
160 }
161
162 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
163 let mut visitor = ClosureFinder {
164 found_closure: None,
165 prefer_inner_blocks: true,
166 };
167 visitor.visit_expr(expr);
168 visitor.found_closure
169 }
170}
171
172struct ClosureFinder {
174 found_closure: Option<syn::Expr>,
175 prefer_inner_blocks: bool,
176}
177
178impl<'ast> Visit<'ast> for ClosureFinder {
179 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
180 if self.found_closure.is_some() {
182 return;
183 }
184
185 match expr {
186 syn::Expr::Closure(_) => {
187 self.found_closure = Some(expr.clone());
188 }
189 syn::Expr::Block(block) if self.prefer_inner_blocks => {
190 for stmt in &block.block.stmts {
192 if let syn::Stmt::Expr(stmt_expr, _) = stmt
193 && let syn::Expr::Block(_) = stmt_expr
194 {
195 let mut inner_visitor = ClosureFinder {
197 found_closure: None,
198 prefer_inner_blocks: false, };
200 inner_visitor.visit_expr(stmt_expr);
201 if inner_visitor.found_closure.is_some() {
202 self.found_closure = Some(stmt_expr.clone());
204 return;
205 }
206 }
207 }
208
209 visit::visit_expr(self, expr);
211
212 if self.found_closure.is_some() {
215 }
217 }
218 _ => {
219 visit::visit_expr(self, expr);
221 }
222 }
223 }
224}
225
226#[derive(Clone, PartialEq, Eq, Hash)]
230pub struct DebugType(pub Box<syn::Type>);
231
232impl From<syn::Type> for DebugType {
233 fn from(t: syn::Type) -> Self {
234 Self(Box::new(t))
235 }
236}
237
238impl Deref for DebugType {
239 type Target = syn::Type;
240
241 fn deref(&self) -> &Self::Target {
242 &self.0
243 }
244}
245
246impl ToTokens for DebugType {
247 fn to_tokens(&self, tokens: &mut TokenStream) {
248 self.0.to_tokens(tokens);
249 }
250}
251
252impl Debug for DebugType {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "{}", self.0.to_token_stream())
255 }
256}
257
258pub enum DebugInstantiate {
259 Building,
260 Finalized(Box<DebugInstantiateFinalized>),
261}
262
263#[cfg_attr(
264 not(feature = "build"),
265 expect(
266 dead_code,
267 reason = "sink, source unused without `feature = \"build\"`."
268 )
269)]
270pub struct DebugInstantiateFinalized {
271 sink: syn::Expr,
272 source: syn::Expr,
273 connect_fn: Option<Box<dyn FnOnce()>>,
274}
275
276impl From<DebugInstantiateFinalized> for DebugInstantiate {
277 fn from(f: DebugInstantiateFinalized) -> Self {
278 Self::Finalized(Box::new(f))
279 }
280}
281
282impl Debug for DebugInstantiate {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 write!(f, "<network instantiate>")
285 }
286}
287
288impl Hash for DebugInstantiate {
289 fn hash<H: Hasher>(&self, _state: &mut H) {
290 }
292}
293
294impl Clone for DebugInstantiate {
295 fn clone(&self) -> Self {
296 match self {
297 DebugInstantiate::Building => DebugInstantiate::Building,
298 DebugInstantiate::Finalized(_) => {
299 panic!("DebugInstantiate::Finalized should not be cloned")
300 }
301 }
302 }
303}
304
305#[derive(Debug, Hash, Clone)]
314pub enum ClusterMembersState {
315 Uninit,
317 Stream(DebugExpr),
320 Tee(LocationId, LocationId),
324}
325
326#[derive(Debug, Hash, Clone)]
328pub enum HydroSource {
329 Stream(DebugExpr),
330 ExternalNetwork(),
331 Iter(DebugExpr),
332 Spin(),
333 ClusterMembers(LocationId, ClusterMembersState),
334 Embedded(syn::Ident),
335 EmbeddedSingleton(syn::Ident),
336}
337
338#[cfg(feature = "build")]
339pub trait DfirBuilder {
345 fn singleton_intermediates(&self) -> bool;
347
348 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
350
351 fn batch(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 out_location: &LocationId,
358 op_meta: &HydroIrOpMetadata,
359 );
360 fn yield_from_tick(
361 &mut self,
362 in_ident: syn::Ident,
363 in_location: &LocationId,
364 in_kind: &CollectionKind,
365 out_ident: &syn::Ident,
366 out_location: &LocationId,
367 );
368
369 fn begin_atomic(
370 &mut self,
371 in_ident: syn::Ident,
372 in_location: &LocationId,
373 in_kind: &CollectionKind,
374 out_ident: &syn::Ident,
375 out_location: &LocationId,
376 op_meta: &HydroIrOpMetadata,
377 );
378 fn end_atomic(
379 &mut self,
380 in_ident: syn::Ident,
381 in_location: &LocationId,
382 in_kind: &CollectionKind,
383 out_ident: &syn::Ident,
384 );
385
386 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
387 fn observe_nondet(
388 &mut self,
389 trusted: bool,
390 location: &LocationId,
391 in_ident: syn::Ident,
392 in_kind: &CollectionKind,
393 out_ident: &syn::Ident,
394 out_kind: &CollectionKind,
395 op_meta: &HydroIrOpMetadata,
396 );
397
398 #[expect(clippy::too_many_arguments, reason = "TODO")]
399 fn create_network(
400 &mut self,
401 from: &LocationId,
402 to: &LocationId,
403 input_ident: syn::Ident,
404 out_ident: &syn::Ident,
405 serialize: Option<&DebugExpr>,
406 sink: syn::Expr,
407 source: syn::Expr,
408 deserialize: Option<&DebugExpr>,
409 tag_id: usize,
410 networking_info: &crate::networking::NetworkingInfo,
411 );
412
413 fn create_external_source(
414 &mut self,
415 on: &LocationId,
416 source_expr: syn::Expr,
417 out_ident: &syn::Ident,
418 deserialize: Option<&DebugExpr>,
419 tag_id: usize,
420 );
421
422 fn create_external_output(
423 &mut self,
424 on: &LocationId,
425 sink_expr: syn::Expr,
426 input_ident: &syn::Ident,
427 serialize: Option<&DebugExpr>,
428 tag_id: usize,
429 );
430}
431
432#[cfg(feature = "build")]
433impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
434 fn singleton_intermediates(&self) -> bool {
435 false
436 }
437
438 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
439 self.entry(location.root().key())
440 .expect("location was removed")
441 .or_default()
442 }
443
444 fn batch(
445 &mut self,
446 in_ident: syn::Ident,
447 in_location: &LocationId,
448 in_kind: &CollectionKind,
449 out_ident: &syn::Ident,
450 _out_location: &LocationId,
451 _op_meta: &HydroIrOpMetadata,
452 ) {
453 let builder = self.get_dfir_mut(in_location.root());
454 if in_kind.is_bounded()
455 && matches!(
456 in_kind,
457 CollectionKind::Singleton { .. }
458 | CollectionKind::Optional { .. }
459 | CollectionKind::KeyedSingleton { .. }
460 )
461 {
462 assert!(in_location.is_top_level());
463 builder.add_dfir(
464 parse_quote! {
465 #out_ident = #in_ident -> persist::<'static>();
466 },
467 None,
468 None,
469 );
470 } else {
471 builder.add_dfir(
472 parse_quote! {
473 #out_ident = #in_ident;
474 },
475 None,
476 None,
477 );
478 }
479 }
480
481 fn yield_from_tick(
482 &mut self,
483 in_ident: syn::Ident,
484 in_location: &LocationId,
485 _in_kind: &CollectionKind,
486 out_ident: &syn::Ident,
487 _out_location: &LocationId,
488 ) {
489 let builder = self.get_dfir_mut(in_location.root());
490 builder.add_dfir(
491 parse_quote! {
492 #out_ident = #in_ident;
493 },
494 None,
495 None,
496 );
497 }
498
499 fn begin_atomic(
500 &mut self,
501 in_ident: syn::Ident,
502 in_location: &LocationId,
503 _in_kind: &CollectionKind,
504 out_ident: &syn::Ident,
505 _out_location: &LocationId,
506 _op_meta: &HydroIrOpMetadata,
507 ) {
508 let builder = self.get_dfir_mut(in_location.root());
509 builder.add_dfir(
510 parse_quote! {
511 #out_ident = #in_ident;
512 },
513 None,
514 None,
515 );
516 }
517
518 fn end_atomic(
519 &mut self,
520 in_ident: syn::Ident,
521 in_location: &LocationId,
522 _in_kind: &CollectionKind,
523 out_ident: &syn::Ident,
524 ) {
525 let builder = self.get_dfir_mut(in_location.root());
526 builder.add_dfir(
527 parse_quote! {
528 #out_ident = #in_ident;
529 },
530 None,
531 None,
532 );
533 }
534
535 fn observe_nondet(
536 &mut self,
537 _trusted: bool,
538 location: &LocationId,
539 in_ident: syn::Ident,
540 _in_kind: &CollectionKind,
541 out_ident: &syn::Ident,
542 _out_kind: &CollectionKind,
543 _op_meta: &HydroIrOpMetadata,
544 ) {
545 let builder = self.get_dfir_mut(location);
546 builder.add_dfir(
547 parse_quote! {
548 #out_ident = #in_ident;
549 },
550 None,
551 None,
552 );
553 }
554
555 fn create_network(
556 &mut self,
557 from: &LocationId,
558 to: &LocationId,
559 input_ident: syn::Ident,
560 out_ident: &syn::Ident,
561 serialize: Option<&DebugExpr>,
562 sink: syn::Expr,
563 source: syn::Expr,
564 deserialize: Option<&DebugExpr>,
565 tag_id: usize,
566 _networking_info: &crate::networking::NetworkingInfo,
567 ) {
568 let sender_builder = self.get_dfir_mut(from);
569 if let Some(serialize_pipeline) = serialize {
570 sender_builder.add_dfir(
571 parse_quote! {
572 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
573 },
574 None,
575 Some(&format!("send{}", tag_id)),
577 );
578 } else {
579 sender_builder.add_dfir(
580 parse_quote! {
581 #input_ident -> dest_sink(#sink);
582 },
583 None,
584 Some(&format!("send{}", tag_id)),
585 );
586 }
587
588 let receiver_builder = self.get_dfir_mut(to);
589 if let Some(deserialize_pipeline) = deserialize {
590 receiver_builder.add_dfir(
591 parse_quote! {
592 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
593 },
594 None,
595 Some(&format!("recv{}", tag_id)),
596 );
597 } else {
598 receiver_builder.add_dfir(
599 parse_quote! {
600 #out_ident = source_stream(#source);
601 },
602 None,
603 Some(&format!("recv{}", tag_id)),
604 );
605 }
606 }
607
608 fn create_external_source(
609 &mut self,
610 on: &LocationId,
611 source_expr: syn::Expr,
612 out_ident: &syn::Ident,
613 deserialize: Option<&DebugExpr>,
614 tag_id: usize,
615 ) {
616 let receiver_builder = self.get_dfir_mut(on);
617 if let Some(deserialize_pipeline) = deserialize {
618 receiver_builder.add_dfir(
619 parse_quote! {
620 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
621 },
622 None,
623 Some(&format!("recv{}", tag_id)),
624 );
625 } else {
626 receiver_builder.add_dfir(
627 parse_quote! {
628 #out_ident = source_stream(#source_expr);
629 },
630 None,
631 Some(&format!("recv{}", tag_id)),
632 );
633 }
634 }
635
636 fn create_external_output(
637 &mut self,
638 on: &LocationId,
639 sink_expr: syn::Expr,
640 input_ident: &syn::Ident,
641 serialize: Option<&DebugExpr>,
642 tag_id: usize,
643 ) {
644 let sender_builder = self.get_dfir_mut(on);
645 if let Some(serialize_fn) = serialize {
646 sender_builder.add_dfir(
647 parse_quote! {
648 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
649 },
650 None,
651 Some(&format!("send{}", tag_id)),
653 );
654 } else {
655 sender_builder.add_dfir(
656 parse_quote! {
657 #input_ident -> dest_sink(#sink_expr);
658 },
659 None,
660 Some(&format!("send{}", tag_id)),
661 );
662 }
663 }
664}
665
666#[cfg(feature = "build")]
667pub enum BuildersOrCallback<'a, L, N>
668where
669 L: FnMut(&mut HydroRoot, &mut usize),
670 N: FnMut(&mut HydroNode, &mut usize),
671{
672 Builders(&'a mut dyn DfirBuilder),
673 Callback(L, N),
674}
675
676#[derive(Debug, Hash)]
680pub enum HydroRoot {
681 ForEach {
682 f: DebugExpr,
683 input: Box<HydroNode>,
684 op_metadata: HydroIrOpMetadata,
685 },
686 SendExternal {
687 to_external_key: LocationKey,
688 to_port_id: ExternalPortId,
689 to_many: bool,
690 unpaired: bool,
691 serialize_fn: Option<DebugExpr>,
692 instantiate_fn: DebugInstantiate,
693 input: Box<HydroNode>,
694 op_metadata: HydroIrOpMetadata,
695 },
696 DestSink {
697 sink: DebugExpr,
698 input: Box<HydroNode>,
699 op_metadata: HydroIrOpMetadata,
700 },
701 CycleSink {
702 cycle_id: CycleId,
703 input: Box<HydroNode>,
704 op_metadata: HydroIrOpMetadata,
705 },
706 EmbeddedOutput {
707 ident: syn::Ident,
708 input: Box<HydroNode>,
709 op_metadata: HydroIrOpMetadata,
710 },
711 Null {
712 input: Box<HydroNode>,
713 op_metadata: HydroIrOpMetadata,
714 },
715}
716
717impl HydroRoot {
718 #[cfg(feature = "build")]
719 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
720 pub fn compile_network<'a, D>(
721 &mut self,
722 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723 seen_tees: &mut SeenSharedNodes,
724 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
725 processes: &SparseSecondaryMap<LocationKey, D::Process>,
726 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727 externals: &SparseSecondaryMap<LocationKey, D::External>,
728 env: &mut D::InstantiateEnv,
729 ) where
730 D: Deploy<'a>,
731 {
732 let refcell_extra_stmts = RefCell::new(extra_stmts);
733 let refcell_env = RefCell::new(env);
734 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
735 self.transform_bottom_up(
736 &mut |l| {
737 if let HydroRoot::SendExternal {
738 input,
739 to_external_key,
740 to_port_id,
741 to_many,
742 unpaired,
743 instantiate_fn,
744 ..
745 } = l
746 {
747 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
748 DebugInstantiate::Building => {
749 let to_node = externals
750 .get(*to_external_key)
751 .unwrap_or_else(|| {
752 panic!("A external used in the graph was not instantiated: {}", to_external_key)
753 })
754 .clone();
755
756 match input.metadata().location_id.root() {
757 &LocationId::Process(process_key) => {
758 if *to_many {
759 (
760 (
761 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
762 parse_quote!(DUMMY),
763 ),
764 Box::new(|| {}) as Box<dyn FnOnce()>,
765 )
766 } else {
767 let from_node = processes
768 .get(process_key)
769 .unwrap_or_else(|| {
770 panic!("A process used in the graph was not instantiated: {}", process_key)
771 })
772 .clone();
773
774 let sink_port = from_node.next_port();
775 let source_port = to_node.next_port();
776
777 if *unpaired {
778 use stageleft::quote_type;
779 use tokio_util::codec::LengthDelimitedCodec;
780
781 to_node.register(*to_port_id, source_port.clone());
782
783 let _ = D::e2o_source(
784 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
785 &to_node, &source_port,
786 &from_node, &sink_port,
787 "e_type::<LengthDelimitedCodec>(),
788 format!("{}_{}", *to_external_key, *to_port_id)
789 );
790 }
791
792 (
793 (
794 D::o2e_sink(
795 &from_node,
796 &sink_port,
797 &to_node,
798 &source_port,
799 format!("{}_{}", *to_external_key, *to_port_id)
800 ),
801 parse_quote!(DUMMY),
802 ),
803 if *unpaired {
804 D::e2o_connect(
805 &to_node,
806 &source_port,
807 &from_node,
808 &sink_port,
809 *to_many,
810 NetworkHint::Auto,
811 )
812 } else {
813 Box::new(|| {}) as Box<dyn FnOnce()>
814 },
815 )
816 }
817 }
818 LocationId::Cluster(cluster_key) => {
819 let from_node = clusters
820 .get(*cluster_key)
821 .unwrap_or_else(|| {
822 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
823 })
824 .clone();
825
826 let sink_port = from_node.next_port();
827 let source_port = to_node.next_port();
828
829 if *unpaired {
830 to_node.register(*to_port_id, source_port.clone());
831 }
832
833 (
834 (
835 D::m2e_sink(
836 &from_node,
837 &sink_port,
838 &to_node,
839 &source_port,
840 format!("{}_{}", *to_external_key, *to_port_id)
841 ),
842 parse_quote!(DUMMY),
843 ),
844 Box::new(|| {}) as Box<dyn FnOnce()>,
845 )
846 }
847 _ => panic!()
848 }
849 },
850
851 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
852 };
853
854 *instantiate_fn = DebugInstantiateFinalized {
855 sink: sink_expr,
856 source: source_expr,
857 connect_fn: Some(connect_fn),
858 }
859 .into();
860 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
861 let element_type = match &input.metadata().collection_kind {
862 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
863 _ => panic!("Embedded output must have Stream collection kind"),
864 };
865 let location_key = match input.metadata().location_id.root() {
866 LocationId::Process(key) | LocationId::Cluster(key) => *key,
867 _ => panic!("Embedded output must be on a process or cluster"),
868 };
869 D::register_embedded_output(
870 &mut refcell_env.borrow_mut(),
871 location_key,
872 ident,
873 &element_type,
874 );
875 }
876 },
877 &mut |n| {
878 if let HydroNode::Network {
879 name,
880 networking_info,
881 input,
882 instantiate_fn,
883 metadata,
884 ..
885 } = n
886 {
887 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
888 DebugInstantiate::Building => instantiate_network::<D>(
889 &mut refcell_env.borrow_mut(),
890 input.metadata().location_id.root(),
891 metadata.location_id.root(),
892 processes,
893 clusters,
894 name.as_deref(),
895 networking_info,
896 ),
897
898 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
899 };
900
901 *instantiate_fn = DebugInstantiateFinalized {
902 sink: sink_expr,
903 source: source_expr,
904 connect_fn: Some(connect_fn),
905 }
906 .into();
907 } else if let HydroNode::ExternalInput {
908 from_external_key,
909 from_port_id,
910 from_many,
911 codec_type,
912 port_hint,
913 instantiate_fn,
914 metadata,
915 ..
916 } = n
917 {
918 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
919 DebugInstantiate::Building => {
920 let from_node = externals
921 .get(*from_external_key)
922 .unwrap_or_else(|| {
923 panic!(
924 "A external used in the graph was not instantiated: {}",
925 from_external_key,
926 )
927 })
928 .clone();
929
930 match metadata.location_id.root() {
931 &LocationId::Process(process_key) => {
932 let to_node = processes
933 .get(process_key)
934 .unwrap_or_else(|| {
935 panic!("A process used in the graph was not instantiated: {}", process_key)
936 })
937 .clone();
938
939 let sink_port = from_node.next_port();
940 let source_port = to_node.next_port();
941
942 from_node.register(*from_port_id, sink_port.clone());
943
944 (
945 (
946 parse_quote!(DUMMY),
947 if *from_many {
948 D::e2o_many_source(
949 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
950 &to_node, &source_port,
951 codec_type.0.as_ref(),
952 format!("{}_{}", *from_external_key, *from_port_id)
953 )
954 } else {
955 D::e2o_source(
956 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
957 &from_node, &sink_port,
958 &to_node, &source_port,
959 codec_type.0.as_ref(),
960 format!("{}_{}", *from_external_key, *from_port_id)
961 )
962 },
963 ),
964 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965 )
966 }
967 LocationId::Cluster(cluster_key) => {
968 let to_node = clusters
969 .get(*cluster_key)
970 .unwrap_or_else(|| {
971 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
972 })
973 .clone();
974
975 let sink_port = from_node.next_port();
976 let source_port = to_node.next_port();
977
978 from_node.register(*from_port_id, sink_port.clone());
979
980 (
981 (
982 parse_quote!(DUMMY),
983 D::e2m_source(
984 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
985 &from_node, &sink_port,
986 &to_node, &source_port,
987 codec_type.0.as_ref(),
988 format!("{}_{}", *from_external_key, *from_port_id)
989 ),
990 ),
991 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
992 )
993 }
994 _ => panic!()
995 }
996 },
997
998 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
999 };
1000
1001 *instantiate_fn = DebugInstantiateFinalized {
1002 sink: sink_expr,
1003 source: source_expr,
1004 connect_fn: Some(connect_fn),
1005 }
1006 .into();
1007 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1008 let element_type = match &metadata.collection_kind {
1009 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1010 _ => panic!("Embedded source must have Stream collection kind"),
1011 };
1012 let location_key = match metadata.location_id.root() {
1013 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1014 _ => panic!("Embedded source must be on a process or cluster"),
1015 };
1016 D::register_embedded_stream_input(
1017 &mut refcell_env.borrow_mut(),
1018 location_key,
1019 ident,
1020 &element_type,
1021 );
1022 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1023 let element_type = match &metadata.collection_kind {
1024 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1025 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1026 };
1027 let location_key = match metadata.location_id.root() {
1028 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1029 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1030 };
1031 D::register_embedded_singleton_input(
1032 &mut refcell_env.borrow_mut(),
1033 location_key,
1034 ident,
1035 &element_type,
1036 );
1037 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1038 match state {
1039 ClusterMembersState::Uninit => {
1040 let at_location = metadata.location_id.root().clone();
1041 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1042 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1043 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1045 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1046 &(),
1047 );
1048 *state = ClusterMembersState::Stream(expr.into());
1049 } else {
1050 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1052 }
1053 }
1054 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1055 panic!("cluster members already finalized");
1056 }
1057 }
1058 }
1059 },
1060 seen_tees,
1061 false,
1062 );
1063 }
1064
1065 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1066 self.transform_bottom_up(
1067 &mut |l| {
1068 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1069 match instantiate_fn {
1070 DebugInstantiate::Building => panic!("network not built"),
1071
1072 DebugInstantiate::Finalized(finalized) => {
1073 (finalized.connect_fn.take().unwrap())();
1074 }
1075 }
1076 }
1077 },
1078 &mut |n| {
1079 if let HydroNode::Network { instantiate_fn, .. }
1080 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1081 {
1082 match instantiate_fn {
1083 DebugInstantiate::Building => panic!("network not built"),
1084
1085 DebugInstantiate::Finalized(finalized) => {
1086 (finalized.connect_fn.take().unwrap())();
1087 }
1088 }
1089 }
1090 },
1091 seen_tees,
1092 false,
1093 );
1094 }
1095
1096 pub fn transform_bottom_up(
1097 &mut self,
1098 transform_root: &mut impl FnMut(&mut HydroRoot),
1099 transform_node: &mut impl FnMut(&mut HydroNode),
1100 seen_tees: &mut SeenSharedNodes,
1101 check_well_formed: bool,
1102 ) {
1103 self.transform_children(
1104 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1105 seen_tees,
1106 );
1107
1108 transform_root(self);
1109 }
1110
1111 pub fn transform_children(
1112 &mut self,
1113 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1114 seen_tees: &mut SeenSharedNodes,
1115 ) {
1116 match self {
1117 HydroRoot::ForEach { input, .. }
1118 | HydroRoot::SendExternal { input, .. }
1119 | HydroRoot::DestSink { input, .. }
1120 | HydroRoot::CycleSink { input, .. }
1121 | HydroRoot::EmbeddedOutput { input, .. }
1122 | HydroRoot::Null { input, .. } => {
1123 transform(input, seen_tees);
1124 }
1125 }
1126 }
1127
1128 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1129 match self {
1130 HydroRoot::ForEach {
1131 f,
1132 input,
1133 op_metadata,
1134 } => HydroRoot::ForEach {
1135 f: f.clone(),
1136 input: Box::new(input.deep_clone(seen_tees)),
1137 op_metadata: op_metadata.clone(),
1138 },
1139 HydroRoot::SendExternal {
1140 to_external_key,
1141 to_port_id,
1142 to_many,
1143 unpaired,
1144 serialize_fn,
1145 instantiate_fn,
1146 input,
1147 op_metadata,
1148 } => HydroRoot::SendExternal {
1149 to_external_key: *to_external_key,
1150 to_port_id: *to_port_id,
1151 to_many: *to_many,
1152 unpaired: *unpaired,
1153 serialize_fn: serialize_fn.clone(),
1154 instantiate_fn: instantiate_fn.clone(),
1155 input: Box::new(input.deep_clone(seen_tees)),
1156 op_metadata: op_metadata.clone(),
1157 },
1158 HydroRoot::DestSink {
1159 sink,
1160 input,
1161 op_metadata,
1162 } => HydroRoot::DestSink {
1163 sink: sink.clone(),
1164 input: Box::new(input.deep_clone(seen_tees)),
1165 op_metadata: op_metadata.clone(),
1166 },
1167 HydroRoot::CycleSink {
1168 cycle_id,
1169 input,
1170 op_metadata,
1171 } => HydroRoot::CycleSink {
1172 cycle_id: *cycle_id,
1173 input: Box::new(input.deep_clone(seen_tees)),
1174 op_metadata: op_metadata.clone(),
1175 },
1176 HydroRoot::EmbeddedOutput {
1177 ident,
1178 input,
1179 op_metadata,
1180 } => HydroRoot::EmbeddedOutput {
1181 ident: ident.clone(),
1182 input: Box::new(input.deep_clone(seen_tees)),
1183 op_metadata: op_metadata.clone(),
1184 },
1185 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1186 input: Box::new(input.deep_clone(seen_tees)),
1187 op_metadata: op_metadata.clone(),
1188 },
1189 }
1190 }
1191
1192 #[cfg(feature = "build")]
1193 pub fn emit(
1194 &mut self,
1195 graph_builders: &mut dyn DfirBuilder,
1196 seen_tees: &mut SeenSharedNodes,
1197 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1198 next_stmt_id: &mut usize,
1199 ) {
1200 self.emit_core(
1201 &mut BuildersOrCallback::<
1202 fn(&mut HydroRoot, &mut usize),
1203 fn(&mut HydroNode, &mut usize),
1204 >::Builders(graph_builders),
1205 seen_tees,
1206 built_tees,
1207 next_stmt_id,
1208 );
1209 }
1210
1211 #[cfg(feature = "build")]
1212 pub fn emit_core(
1213 &mut self,
1214 builders_or_callback: &mut BuildersOrCallback<
1215 impl FnMut(&mut HydroRoot, &mut usize),
1216 impl FnMut(&mut HydroNode, &mut usize),
1217 >,
1218 seen_tees: &mut SeenSharedNodes,
1219 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1220 next_stmt_id: &mut usize,
1221 ) {
1222 match self {
1223 HydroRoot::ForEach { f, input, .. } => {
1224 let input_ident =
1225 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1226
1227 match builders_or_callback {
1228 BuildersOrCallback::Builders(graph_builders) => {
1229 graph_builders
1230 .get_dfir_mut(&input.metadata().location_id)
1231 .add_dfir(
1232 parse_quote! {
1233 #input_ident -> for_each(#f);
1234 },
1235 None,
1236 Some(&next_stmt_id.to_string()),
1237 );
1238 }
1239 BuildersOrCallback::Callback(leaf_callback, _) => {
1240 leaf_callback(self, next_stmt_id);
1241 }
1242 }
1243
1244 *next_stmt_id += 1;
1245 }
1246
1247 HydroRoot::SendExternal {
1248 serialize_fn,
1249 instantiate_fn,
1250 input,
1251 ..
1252 } => {
1253 let input_ident =
1254 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1255
1256 match builders_or_callback {
1257 BuildersOrCallback::Builders(graph_builders) => {
1258 let (sink_expr, _) = match instantiate_fn {
1259 DebugInstantiate::Building => (
1260 syn::parse_quote!(DUMMY_SINK),
1261 syn::parse_quote!(DUMMY_SOURCE),
1262 ),
1263
1264 DebugInstantiate::Finalized(finalized) => {
1265 (finalized.sink.clone(), finalized.source.clone())
1266 }
1267 };
1268
1269 graph_builders.create_external_output(
1270 &input.metadata().location_id,
1271 sink_expr,
1272 &input_ident,
1273 serialize_fn.as_ref(),
1274 *next_stmt_id,
1275 );
1276 }
1277 BuildersOrCallback::Callback(leaf_callback, _) => {
1278 leaf_callback(self, next_stmt_id);
1279 }
1280 }
1281
1282 *next_stmt_id += 1;
1283 }
1284
1285 HydroRoot::DestSink { sink, input, .. } => {
1286 let input_ident =
1287 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1288
1289 match builders_or_callback {
1290 BuildersOrCallback::Builders(graph_builders) => {
1291 graph_builders
1292 .get_dfir_mut(&input.metadata().location_id)
1293 .add_dfir(
1294 parse_quote! {
1295 #input_ident -> dest_sink(#sink);
1296 },
1297 None,
1298 Some(&next_stmt_id.to_string()),
1299 );
1300 }
1301 BuildersOrCallback::Callback(leaf_callback, _) => {
1302 leaf_callback(self, next_stmt_id);
1303 }
1304 }
1305
1306 *next_stmt_id += 1;
1307 }
1308
1309 HydroRoot::CycleSink {
1310 cycle_id, input, ..
1311 } => {
1312 let input_ident =
1313 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1314
1315 match builders_or_callback {
1316 BuildersOrCallback::Builders(graph_builders) => {
1317 let elem_type: syn::Type = match &input.metadata().collection_kind {
1318 CollectionKind::KeyedSingleton {
1319 key_type,
1320 value_type,
1321 ..
1322 }
1323 | CollectionKind::KeyedStream {
1324 key_type,
1325 value_type,
1326 ..
1327 } => {
1328 parse_quote!((#key_type, #value_type))
1329 }
1330 CollectionKind::Stream { element_type, .. }
1331 | CollectionKind::Singleton { element_type, .. }
1332 | CollectionKind::Optional { element_type, .. } => {
1333 parse_quote!(#element_type)
1334 }
1335 };
1336
1337 let cycle_id_ident = cycle_id.as_ident();
1338 graph_builders
1339 .get_dfir_mut(&input.metadata().location_id)
1340 .add_dfir(
1341 parse_quote! {
1342 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1343 },
1344 None,
1345 None,
1346 );
1347 }
1348 BuildersOrCallback::Callback(_, _) => {}
1350 }
1351 }
1352
1353 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1354 let input_ident =
1355 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1356
1357 match builders_or_callback {
1358 BuildersOrCallback::Builders(graph_builders) => {
1359 graph_builders
1360 .get_dfir_mut(&input.metadata().location_id)
1361 .add_dfir(
1362 parse_quote! {
1363 #input_ident -> for_each(&mut #ident);
1364 },
1365 None,
1366 Some(&next_stmt_id.to_string()),
1367 );
1368 }
1369 BuildersOrCallback::Callback(leaf_callback, _) => {
1370 leaf_callback(self, next_stmt_id);
1371 }
1372 }
1373
1374 *next_stmt_id += 1;
1375 }
1376
1377 HydroRoot::Null { input, .. } => {
1378 let input_ident =
1379 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1380
1381 match builders_or_callback {
1382 BuildersOrCallback::Builders(graph_builders) => {
1383 graph_builders
1384 .get_dfir_mut(&input.metadata().location_id)
1385 .add_dfir(
1386 parse_quote! {
1387 #input_ident -> for_each(|_| {});
1388 },
1389 None,
1390 Some(&next_stmt_id.to_string()),
1391 );
1392 }
1393 BuildersOrCallback::Callback(leaf_callback, _) => {
1394 leaf_callback(self, next_stmt_id);
1395 }
1396 }
1397
1398 *next_stmt_id += 1;
1399 }
1400 }
1401 }
1402
1403 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1404 match self {
1405 HydroRoot::ForEach { op_metadata, .. }
1406 | HydroRoot::SendExternal { op_metadata, .. }
1407 | HydroRoot::DestSink { op_metadata, .. }
1408 | HydroRoot::CycleSink { op_metadata, .. }
1409 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1410 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1411 }
1412 }
1413
1414 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1415 match self {
1416 HydroRoot::ForEach { op_metadata, .. }
1417 | HydroRoot::SendExternal { op_metadata, .. }
1418 | HydroRoot::DestSink { op_metadata, .. }
1419 | HydroRoot::CycleSink { op_metadata, .. }
1420 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1421 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1422 }
1423 }
1424
1425 pub fn input(&self) -> &HydroNode {
1426 match self {
1427 HydroRoot::ForEach { input, .. }
1428 | HydroRoot::SendExternal { input, .. }
1429 | HydroRoot::DestSink { input, .. }
1430 | HydroRoot::CycleSink { input, .. }
1431 | HydroRoot::EmbeddedOutput { input, .. }
1432 | HydroRoot::Null { input, .. } => input,
1433 }
1434 }
1435
1436 pub fn input_metadata(&self) -> &HydroIrMetadata {
1437 self.input().metadata()
1438 }
1439
1440 pub fn print_root(&self) -> String {
1441 match self {
1442 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1443 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1444 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1445 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1446 HydroRoot::EmbeddedOutput { ident, .. } => {
1447 format!("EmbeddedOutput({})", ident)
1448 }
1449 HydroRoot::Null { .. } => "Null".to_owned(),
1450 }
1451 }
1452
1453 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1454 match self {
1455 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1456 transform(f);
1457 }
1458 HydroRoot::SendExternal { .. }
1459 | HydroRoot::CycleSink { .. }
1460 | HydroRoot::EmbeddedOutput { .. }
1461 | HydroRoot::Null { .. } => {}
1462 }
1463 }
1464}
1465
1466#[cfg(feature = "build")]
1467fn tick_of(loc: &LocationId) -> Option<ClockId> {
1468 match loc {
1469 LocationId::Tick(id, _) => Some(*id),
1470 LocationId::Atomic(inner) => tick_of(inner),
1471 _ => None,
1472 }
1473}
1474
1475#[cfg(feature = "build")]
1476fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1477 match loc {
1478 LocationId::Tick(id, inner) => {
1479 *id = uf_find(uf, *id);
1480 remap_location(inner, uf);
1481 }
1482 LocationId::Atomic(inner) => {
1483 remap_location(inner, uf);
1484 }
1485 LocationId::Process(_) | LocationId::Cluster(_) => {}
1486 }
1487}
1488
1489#[cfg(feature = "build")]
1490fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1491 let p = *parent.get(&x).unwrap_or(&x);
1492 if p == x {
1493 return x;
1494 }
1495 let root = uf_find(parent, p);
1496 parent.insert(x, root);
1497 root
1498}
1499
1500#[cfg(feature = "build")]
1501fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1502 let ra = uf_find(parent, a);
1503 let rb = uf_find(parent, b);
1504 if ra != rb {
1505 parent.insert(ra, rb);
1506 }
1507}
1508
1509#[cfg(feature = "build")]
1513pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1514 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1515
1516 transform_bottom_up(
1518 ir,
1519 &mut |_| {},
1520 &mut |node: &mut HydroNode| {
1521 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1522 node
1523 && let (Some(a), Some(b)) = (
1524 tick_of(&inner.metadata().location_id),
1525 tick_of(&metadata.location_id),
1526 )
1527 {
1528 uf_union(&mut uf, a, b);
1529 }
1530 },
1531 false,
1532 );
1533
1534 transform_bottom_up(
1536 ir,
1537 &mut |_| {},
1538 &mut |node: &mut HydroNode| {
1539 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1540 },
1541 false,
1542 );
1543}
1544
1545#[cfg(feature = "build")]
1546pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1547 let mut builders = SecondaryMap::new();
1548 let mut seen_tees = HashMap::new();
1549 let mut built_tees = HashMap::new();
1550 let mut next_stmt_id = 0;
1551 for leaf in ir {
1552 leaf.emit(
1553 &mut builders,
1554 &mut seen_tees,
1555 &mut built_tees,
1556 &mut next_stmt_id,
1557 );
1558 }
1559 builders
1560}
1561
1562#[cfg(feature = "build")]
1563pub fn traverse_dfir(
1564 ir: &mut [HydroRoot],
1565 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1566 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1567) {
1568 let mut seen_tees = HashMap::new();
1569 let mut built_tees = HashMap::new();
1570 let mut next_stmt_id = 0;
1571 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1572 ir.iter_mut().for_each(|leaf| {
1573 leaf.emit_core(
1574 &mut callback,
1575 &mut seen_tees,
1576 &mut built_tees,
1577 &mut next_stmt_id,
1578 );
1579 });
1580}
1581
1582pub fn transform_bottom_up(
1583 ir: &mut [HydroRoot],
1584 transform_root: &mut impl FnMut(&mut HydroRoot),
1585 transform_node: &mut impl FnMut(&mut HydroNode),
1586 check_well_formed: bool,
1587) {
1588 let mut seen_tees = HashMap::new();
1589 ir.iter_mut().for_each(|leaf| {
1590 leaf.transform_bottom_up(
1591 transform_root,
1592 transform_node,
1593 &mut seen_tees,
1594 check_well_formed,
1595 );
1596 });
1597}
1598
1599pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1600 let mut seen_tees = HashMap::new();
1601 ir.iter()
1602 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1603 .collect()
1604}
1605
1606type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1607thread_local! {
1608 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1609}
1610
1611pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1612 PRINTED_TEES.with(|printed_tees| {
1613 let mut printed_tees_mut = printed_tees.borrow_mut();
1614 *printed_tees_mut = Some((0, HashMap::new()));
1615 drop(printed_tees_mut);
1616
1617 let ret = f();
1618
1619 let mut printed_tees_mut = printed_tees.borrow_mut();
1620 *printed_tees_mut = None;
1621
1622 ret
1623 })
1624}
1625
1626pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1627
1628impl SharedNode {
1629 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1630 Rc::as_ptr(&self.0)
1631 }
1632}
1633
1634impl Debug for SharedNode {
1635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1636 PRINTED_TEES.with(|printed_tees| {
1637 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1638 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1639
1640 if let Some(printed_tees_mut) = printed_tees_mut {
1641 if let Some(existing) = printed_tees_mut
1642 .1
1643 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1644 {
1645 write!(f, "<shared {}>", existing)
1646 } else {
1647 let next_id = printed_tees_mut.0;
1648 printed_tees_mut.0 += 1;
1649 printed_tees_mut
1650 .1
1651 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1652 drop(printed_tees_mut_borrow);
1653 write!(f, "<shared {}>: ", next_id)?;
1654 Debug::fmt(&self.0.borrow(), f)
1655 }
1656 } else {
1657 drop(printed_tees_mut_borrow);
1658 write!(f, "<shared>: ")?;
1659 Debug::fmt(&self.0.borrow(), f)
1660 }
1661 })
1662 }
1663}
1664
1665impl Hash for SharedNode {
1666 fn hash<H: Hasher>(&self, state: &mut H) {
1667 self.0.borrow_mut().hash(state);
1668 }
1669}
1670
1671#[derive(Clone, PartialEq, Eq, Debug)]
1672pub enum BoundKind {
1673 Unbounded,
1674 Bounded,
1675}
1676
1677#[derive(Clone, PartialEq, Eq, Debug)]
1678pub enum StreamOrder {
1679 NoOrder,
1680 TotalOrder,
1681}
1682
1683#[derive(Clone, PartialEq, Eq, Debug)]
1684pub enum StreamRetry {
1685 AtLeastOnce,
1686 ExactlyOnce,
1687}
1688
1689#[derive(Clone, PartialEq, Eq, Debug)]
1690pub enum KeyedSingletonBoundKind {
1691 Unbounded,
1692 BoundedValue,
1693 Bounded,
1694}
1695
1696#[derive(Clone, PartialEq, Eq, Debug)]
1697pub enum CollectionKind {
1698 Stream {
1699 bound: BoundKind,
1700 order: StreamOrder,
1701 retry: StreamRetry,
1702 element_type: DebugType,
1703 },
1704 Singleton {
1705 bound: BoundKind,
1706 element_type: DebugType,
1707 },
1708 Optional {
1709 bound: BoundKind,
1710 element_type: DebugType,
1711 },
1712 KeyedStream {
1713 bound: BoundKind,
1714 value_order: StreamOrder,
1715 value_retry: StreamRetry,
1716 key_type: DebugType,
1717 value_type: DebugType,
1718 },
1719 KeyedSingleton {
1720 bound: KeyedSingletonBoundKind,
1721 key_type: DebugType,
1722 value_type: DebugType,
1723 },
1724}
1725
1726impl CollectionKind {
1727 pub fn is_bounded(&self) -> bool {
1728 matches!(
1729 self,
1730 CollectionKind::Stream {
1731 bound: BoundKind::Bounded,
1732 ..
1733 } | CollectionKind::Singleton {
1734 bound: BoundKind::Bounded,
1735 ..
1736 } | CollectionKind::Optional {
1737 bound: BoundKind::Bounded,
1738 ..
1739 } | CollectionKind::KeyedStream {
1740 bound: BoundKind::Bounded,
1741 ..
1742 } | CollectionKind::KeyedSingleton {
1743 bound: KeyedSingletonBoundKind::Bounded,
1744 ..
1745 }
1746 )
1747 }
1748}
1749
1750#[derive(Clone)]
1751pub struct HydroIrMetadata {
1752 pub location_id: LocationId,
1753 pub collection_kind: CollectionKind,
1754 pub cardinality: Option<usize>,
1755 pub tag: Option<String>,
1756 pub op: HydroIrOpMetadata,
1757}
1758
1759impl Hash for HydroIrMetadata {
1761 fn hash<H: Hasher>(&self, _: &mut H) {}
1762}
1763
1764impl PartialEq for HydroIrMetadata {
1765 fn eq(&self, _: &Self) -> bool {
1766 true
1767 }
1768}
1769
1770impl Eq for HydroIrMetadata {}
1771
1772impl Debug for HydroIrMetadata {
1773 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1774 f.debug_struct("HydroIrMetadata")
1775 .field("location_id", &self.location_id)
1776 .field("collection_kind", &self.collection_kind)
1777 .finish()
1778 }
1779}
1780
1781#[derive(Clone)]
1784pub struct HydroIrOpMetadata {
1785 pub backtrace: Backtrace,
1786 pub cpu_usage: Option<f64>,
1787 pub network_recv_cpu_usage: Option<f64>,
1788 pub id: Option<usize>,
1789}
1790
1791impl HydroIrOpMetadata {
1792 #[expect(
1793 clippy::new_without_default,
1794 reason = "explicit calls to new ensure correct backtrace bounds"
1795 )]
1796 pub fn new() -> HydroIrOpMetadata {
1797 Self::new_with_skip(1)
1798 }
1799
1800 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1801 HydroIrOpMetadata {
1802 backtrace: Backtrace::get_backtrace(2 + skip_count),
1803 cpu_usage: None,
1804 network_recv_cpu_usage: None,
1805 id: None,
1806 }
1807 }
1808}
1809
1810impl Debug for HydroIrOpMetadata {
1811 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1812 f.debug_struct("HydroIrOpMetadata").finish()
1813 }
1814}
1815
1816impl Hash for HydroIrOpMetadata {
1817 fn hash<H: Hasher>(&self, _: &mut H) {}
1818}
1819
1820#[derive(Debug, Hash)]
1823pub enum HydroNode {
1824 Placeholder,
1825
1826 Cast {
1834 inner: Box<HydroNode>,
1835 metadata: HydroIrMetadata,
1836 },
1837
1838 ObserveNonDet {
1844 inner: Box<HydroNode>,
1845 trusted: bool, metadata: HydroIrMetadata,
1847 },
1848
1849 Source {
1850 source: HydroSource,
1851 metadata: HydroIrMetadata,
1852 },
1853
1854 SingletonSource {
1855 value: DebugExpr,
1856 first_tick_only: bool,
1857 metadata: HydroIrMetadata,
1858 },
1859
1860 CycleSource {
1861 cycle_id: CycleId,
1862 metadata: HydroIrMetadata,
1863 },
1864
1865 Tee {
1866 inner: SharedNode,
1867 metadata: HydroIrMetadata,
1868 },
1869
1870 Partition {
1871 inner: SharedNode,
1872 f: DebugExpr,
1873 is_true: bool,
1874 metadata: HydroIrMetadata,
1875 },
1876
1877 BeginAtomic {
1878 inner: Box<HydroNode>,
1879 metadata: HydroIrMetadata,
1880 },
1881
1882 EndAtomic {
1883 inner: Box<HydroNode>,
1884 metadata: HydroIrMetadata,
1885 },
1886
1887 Batch {
1888 inner: Box<HydroNode>,
1889 metadata: HydroIrMetadata,
1890 },
1891
1892 YieldConcat {
1893 inner: Box<HydroNode>,
1894 metadata: HydroIrMetadata,
1895 },
1896
1897 Chain {
1898 first: Box<HydroNode>,
1899 second: Box<HydroNode>,
1900 metadata: HydroIrMetadata,
1901 },
1902
1903 ChainFirst {
1904 first: Box<HydroNode>,
1905 second: Box<HydroNode>,
1906 metadata: HydroIrMetadata,
1907 },
1908
1909 CrossProduct {
1910 left: Box<HydroNode>,
1911 right: Box<HydroNode>,
1912 metadata: HydroIrMetadata,
1913 },
1914
1915 CrossSingleton {
1916 left: Box<HydroNode>,
1917 right: Box<HydroNode>,
1918 metadata: HydroIrMetadata,
1919 },
1920
1921 Join {
1922 left: Box<HydroNode>,
1923 right: Box<HydroNode>,
1924 metadata: HydroIrMetadata,
1925 },
1926
1927 Difference {
1928 pos: Box<HydroNode>,
1929 neg: Box<HydroNode>,
1930 metadata: HydroIrMetadata,
1931 },
1932
1933 AntiJoin {
1934 pos: Box<HydroNode>,
1935 neg: Box<HydroNode>,
1936 metadata: HydroIrMetadata,
1937 },
1938
1939 ResolveFutures {
1940 input: Box<HydroNode>,
1941 metadata: HydroIrMetadata,
1942 },
1943 ResolveFuturesBlocking {
1944 input: Box<HydroNode>,
1945 metadata: HydroIrMetadata,
1946 },
1947 ResolveFuturesOrdered {
1948 input: Box<HydroNode>,
1949 metadata: HydroIrMetadata,
1950 },
1951
1952 Map {
1953 f: DebugExpr,
1954 input: Box<HydroNode>,
1955 metadata: HydroIrMetadata,
1956 },
1957 FlatMap {
1958 f: DebugExpr,
1959 input: Box<HydroNode>,
1960 metadata: HydroIrMetadata,
1961 },
1962 Filter {
1963 f: DebugExpr,
1964 input: Box<HydroNode>,
1965 metadata: HydroIrMetadata,
1966 },
1967 FilterMap {
1968 f: DebugExpr,
1969 input: Box<HydroNode>,
1970 metadata: HydroIrMetadata,
1971 },
1972
1973 DeferTick {
1974 input: Box<HydroNode>,
1975 metadata: HydroIrMetadata,
1976 },
1977 Enumerate {
1978 input: Box<HydroNode>,
1979 metadata: HydroIrMetadata,
1980 },
1981 Inspect {
1982 f: DebugExpr,
1983 input: Box<HydroNode>,
1984 metadata: HydroIrMetadata,
1985 },
1986
1987 Unique {
1988 input: Box<HydroNode>,
1989 metadata: HydroIrMetadata,
1990 },
1991
1992 Sort {
1993 input: Box<HydroNode>,
1994 metadata: HydroIrMetadata,
1995 },
1996 Fold {
1997 init: DebugExpr,
1998 acc: DebugExpr,
1999 input: Box<HydroNode>,
2000 metadata: HydroIrMetadata,
2001 },
2002
2003 Scan {
2004 init: DebugExpr,
2005 acc: DebugExpr,
2006 input: Box<HydroNode>,
2007 metadata: HydroIrMetadata,
2008 },
2009 FoldKeyed {
2010 init: DebugExpr,
2011 acc: DebugExpr,
2012 input: Box<HydroNode>,
2013 metadata: HydroIrMetadata,
2014 },
2015
2016 Reduce {
2017 f: DebugExpr,
2018 input: Box<HydroNode>,
2019 metadata: HydroIrMetadata,
2020 },
2021 ReduceKeyed {
2022 f: DebugExpr,
2023 input: Box<HydroNode>,
2024 metadata: HydroIrMetadata,
2025 },
2026 ReduceKeyedWatermark {
2027 f: DebugExpr,
2028 input: Box<HydroNode>,
2029 watermark: Box<HydroNode>,
2030 metadata: HydroIrMetadata,
2031 },
2032
2033 Network {
2034 name: Option<String>,
2035 networking_info: crate::networking::NetworkingInfo,
2036 serialize_fn: Option<DebugExpr>,
2037 instantiate_fn: DebugInstantiate,
2038 deserialize_fn: Option<DebugExpr>,
2039 input: Box<HydroNode>,
2040 metadata: HydroIrMetadata,
2041 },
2042
2043 ExternalInput {
2044 from_external_key: LocationKey,
2045 from_port_id: ExternalPortId,
2046 from_many: bool,
2047 codec_type: DebugType,
2048 port_hint: NetworkHint,
2049 instantiate_fn: DebugInstantiate,
2050 deserialize_fn: Option<DebugExpr>,
2051 metadata: HydroIrMetadata,
2052 },
2053
2054 Counter {
2055 tag: String,
2056 duration: DebugExpr,
2057 prefix: String,
2058 input: Box<HydroNode>,
2059 metadata: HydroIrMetadata,
2060 },
2061}
2062
2063pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2064pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2065
2066impl HydroNode {
2067 pub fn transform_bottom_up(
2068 &mut self,
2069 transform: &mut impl FnMut(&mut HydroNode),
2070 seen_tees: &mut SeenSharedNodes,
2071 check_well_formed: bool,
2072 ) {
2073 self.transform_children(
2074 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2075 seen_tees,
2076 );
2077
2078 transform(self);
2079
2080 let self_location = self.metadata().location_id.root();
2081
2082 if check_well_formed {
2083 match &*self {
2084 HydroNode::Network { .. } => {}
2085 _ => {
2086 self.input_metadata().iter().for_each(|i| {
2087 if i.location_id.root() != self_location {
2088 panic!(
2089 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2090 i,
2091 i.location_id.root(),
2092 self,
2093 self_location
2094 )
2095 }
2096 });
2097 }
2098 }
2099 }
2100 }
2101
2102 #[inline(always)]
2103 pub fn transform_children(
2104 &mut self,
2105 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2106 seen_tees: &mut SeenSharedNodes,
2107 ) {
2108 match self {
2109 HydroNode::Placeholder => {
2110 panic!();
2111 }
2112
2113 HydroNode::Source { .. }
2114 | HydroNode::SingletonSource { .. }
2115 | HydroNode::CycleSource { .. }
2116 | HydroNode::ExternalInput { .. } => {}
2117
2118 HydroNode::Tee { inner, .. } => {
2119 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2120 *inner = SharedNode(transformed.clone());
2121 } else {
2122 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2123 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2124 let mut orig = inner.0.replace(HydroNode::Placeholder);
2125 transform(&mut orig, seen_tees);
2126 *transformed_cell.borrow_mut() = orig;
2127 *inner = SharedNode(transformed_cell);
2128 }
2129 }
2130
2131 HydroNode::Partition { inner, .. } => {
2132 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2133 *inner = SharedNode(transformed.clone());
2134 } else {
2135 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2136 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2137 let mut orig = inner.0.replace(HydroNode::Placeholder);
2138 transform(&mut orig, seen_tees);
2139 *transformed_cell.borrow_mut() = orig;
2140 *inner = SharedNode(transformed_cell);
2141 }
2142 }
2143
2144 HydroNode::Cast { inner, .. }
2145 | HydroNode::ObserveNonDet { inner, .. }
2146 | HydroNode::BeginAtomic { inner, .. }
2147 | HydroNode::EndAtomic { inner, .. }
2148 | HydroNode::Batch { inner, .. }
2149 | HydroNode::YieldConcat { inner, .. } => {
2150 transform(inner.as_mut(), seen_tees);
2151 }
2152
2153 HydroNode::Chain { first, second, .. } => {
2154 transform(first.as_mut(), seen_tees);
2155 transform(second.as_mut(), seen_tees);
2156 }
2157
2158 HydroNode::ChainFirst { first, second, .. } => {
2159 transform(first.as_mut(), seen_tees);
2160 transform(second.as_mut(), seen_tees);
2161 }
2162
2163 HydroNode::CrossSingleton { left, right, .. }
2164 | HydroNode::CrossProduct { left, right, .. }
2165 | HydroNode::Join { left, right, .. } => {
2166 transform(left.as_mut(), seen_tees);
2167 transform(right.as_mut(), seen_tees);
2168 }
2169
2170 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2171 transform(pos.as_mut(), seen_tees);
2172 transform(neg.as_mut(), seen_tees);
2173 }
2174
2175 HydroNode::ReduceKeyedWatermark {
2176 input, watermark, ..
2177 } => {
2178 transform(input.as_mut(), seen_tees);
2179 transform(watermark.as_mut(), seen_tees);
2180 }
2181
2182 HydroNode::Map { input, .. }
2183 | HydroNode::ResolveFutures { input, .. }
2184 | HydroNode::ResolveFuturesBlocking { input, .. }
2185 | HydroNode::ResolveFuturesOrdered { input, .. }
2186 | HydroNode::FlatMap { input, .. }
2187 | HydroNode::Filter { input, .. }
2188 | HydroNode::FilterMap { input, .. }
2189 | HydroNode::Sort { input, .. }
2190 | HydroNode::DeferTick { input, .. }
2191 | HydroNode::Enumerate { input, .. }
2192 | HydroNode::Inspect { input, .. }
2193 | HydroNode::Unique { input, .. }
2194 | HydroNode::Network { input, .. }
2195 | HydroNode::Fold { input, .. }
2196 | HydroNode::Scan { input, .. }
2197 | HydroNode::FoldKeyed { input, .. }
2198 | HydroNode::Reduce { input, .. }
2199 | HydroNode::ReduceKeyed { input, .. }
2200 | HydroNode::Counter { input, .. } => {
2201 transform(input.as_mut(), seen_tees);
2202 }
2203 }
2204 }
2205
2206 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2207 match self {
2208 HydroNode::Placeholder => HydroNode::Placeholder,
2209 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2210 inner: Box::new(inner.deep_clone(seen_tees)),
2211 metadata: metadata.clone(),
2212 },
2213 HydroNode::ObserveNonDet {
2214 inner,
2215 trusted,
2216 metadata,
2217 } => HydroNode::ObserveNonDet {
2218 inner: Box::new(inner.deep_clone(seen_tees)),
2219 trusted: *trusted,
2220 metadata: metadata.clone(),
2221 },
2222 HydroNode::Source { source, metadata } => HydroNode::Source {
2223 source: source.clone(),
2224 metadata: metadata.clone(),
2225 },
2226 HydroNode::SingletonSource {
2227 value,
2228 first_tick_only,
2229 metadata,
2230 } => HydroNode::SingletonSource {
2231 value: value.clone(),
2232 first_tick_only: *first_tick_only,
2233 metadata: metadata.clone(),
2234 },
2235 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2236 cycle_id: *cycle_id,
2237 metadata: metadata.clone(),
2238 },
2239 HydroNode::Tee { inner, metadata } => {
2240 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2241 HydroNode::Tee {
2242 inner: SharedNode(transformed.clone()),
2243 metadata: metadata.clone(),
2244 }
2245 } else {
2246 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2247 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2248 let cloned = inner.0.borrow().deep_clone(seen_tees);
2249 *new_rc.borrow_mut() = cloned;
2250 HydroNode::Tee {
2251 inner: SharedNode(new_rc),
2252 metadata: metadata.clone(),
2253 }
2254 }
2255 }
2256 HydroNode::Partition {
2257 inner,
2258 f,
2259 is_true,
2260 metadata,
2261 } => {
2262 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2263 HydroNode::Partition {
2264 inner: SharedNode(transformed.clone()),
2265 f: f.clone(),
2266 is_true: *is_true,
2267 metadata: metadata.clone(),
2268 }
2269 } else {
2270 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2271 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2272 let cloned = inner.0.borrow().deep_clone(seen_tees);
2273 *new_rc.borrow_mut() = cloned;
2274 HydroNode::Partition {
2275 inner: SharedNode(new_rc),
2276 f: f.clone(),
2277 is_true: *is_true,
2278 metadata: metadata.clone(),
2279 }
2280 }
2281 }
2282 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2283 inner: Box::new(inner.deep_clone(seen_tees)),
2284 metadata: metadata.clone(),
2285 },
2286 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2287 inner: Box::new(inner.deep_clone(seen_tees)),
2288 metadata: metadata.clone(),
2289 },
2290 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2291 inner: Box::new(inner.deep_clone(seen_tees)),
2292 metadata: metadata.clone(),
2293 },
2294 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2295 inner: Box::new(inner.deep_clone(seen_tees)),
2296 metadata: metadata.clone(),
2297 },
2298 HydroNode::Chain {
2299 first,
2300 second,
2301 metadata,
2302 } => HydroNode::Chain {
2303 first: Box::new(first.deep_clone(seen_tees)),
2304 second: Box::new(second.deep_clone(seen_tees)),
2305 metadata: metadata.clone(),
2306 },
2307 HydroNode::ChainFirst {
2308 first,
2309 second,
2310 metadata,
2311 } => HydroNode::ChainFirst {
2312 first: Box::new(first.deep_clone(seen_tees)),
2313 second: Box::new(second.deep_clone(seen_tees)),
2314 metadata: metadata.clone(),
2315 },
2316 HydroNode::CrossProduct {
2317 left,
2318 right,
2319 metadata,
2320 } => HydroNode::CrossProduct {
2321 left: Box::new(left.deep_clone(seen_tees)),
2322 right: Box::new(right.deep_clone(seen_tees)),
2323 metadata: metadata.clone(),
2324 },
2325 HydroNode::CrossSingleton {
2326 left,
2327 right,
2328 metadata,
2329 } => HydroNode::CrossSingleton {
2330 left: Box::new(left.deep_clone(seen_tees)),
2331 right: Box::new(right.deep_clone(seen_tees)),
2332 metadata: metadata.clone(),
2333 },
2334 HydroNode::Join {
2335 left,
2336 right,
2337 metadata,
2338 } => HydroNode::Join {
2339 left: Box::new(left.deep_clone(seen_tees)),
2340 right: Box::new(right.deep_clone(seen_tees)),
2341 metadata: metadata.clone(),
2342 },
2343 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2344 pos: Box::new(pos.deep_clone(seen_tees)),
2345 neg: Box::new(neg.deep_clone(seen_tees)),
2346 metadata: metadata.clone(),
2347 },
2348 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2349 pos: Box::new(pos.deep_clone(seen_tees)),
2350 neg: Box::new(neg.deep_clone(seen_tees)),
2351 metadata: metadata.clone(),
2352 },
2353 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2354 input: Box::new(input.deep_clone(seen_tees)),
2355 metadata: metadata.clone(),
2356 },
2357 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2358 HydroNode::ResolveFuturesBlocking {
2359 input: Box::new(input.deep_clone(seen_tees)),
2360 metadata: metadata.clone(),
2361 }
2362 }
2363 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2364 HydroNode::ResolveFuturesOrdered {
2365 input: Box::new(input.deep_clone(seen_tees)),
2366 metadata: metadata.clone(),
2367 }
2368 }
2369 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2370 f: f.clone(),
2371 input: Box::new(input.deep_clone(seen_tees)),
2372 metadata: metadata.clone(),
2373 },
2374 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2375 f: f.clone(),
2376 input: Box::new(input.deep_clone(seen_tees)),
2377 metadata: metadata.clone(),
2378 },
2379 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2380 f: f.clone(),
2381 input: Box::new(input.deep_clone(seen_tees)),
2382 metadata: metadata.clone(),
2383 },
2384 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2385 f: f.clone(),
2386 input: Box::new(input.deep_clone(seen_tees)),
2387 metadata: metadata.clone(),
2388 },
2389 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2390 input: Box::new(input.deep_clone(seen_tees)),
2391 metadata: metadata.clone(),
2392 },
2393 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2394 input: Box::new(input.deep_clone(seen_tees)),
2395 metadata: metadata.clone(),
2396 },
2397 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2398 f: f.clone(),
2399 input: Box::new(input.deep_clone(seen_tees)),
2400 metadata: metadata.clone(),
2401 },
2402 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2403 input: Box::new(input.deep_clone(seen_tees)),
2404 metadata: metadata.clone(),
2405 },
2406 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2407 input: Box::new(input.deep_clone(seen_tees)),
2408 metadata: metadata.clone(),
2409 },
2410 HydroNode::Fold {
2411 init,
2412 acc,
2413 input,
2414 metadata,
2415 } => HydroNode::Fold {
2416 init: init.clone(),
2417 acc: acc.clone(),
2418 input: Box::new(input.deep_clone(seen_tees)),
2419 metadata: metadata.clone(),
2420 },
2421 HydroNode::Scan {
2422 init,
2423 acc,
2424 input,
2425 metadata,
2426 } => HydroNode::Scan {
2427 init: init.clone(),
2428 acc: acc.clone(),
2429 input: Box::new(input.deep_clone(seen_tees)),
2430 metadata: metadata.clone(),
2431 },
2432 HydroNode::FoldKeyed {
2433 init,
2434 acc,
2435 input,
2436 metadata,
2437 } => HydroNode::FoldKeyed {
2438 init: init.clone(),
2439 acc: acc.clone(),
2440 input: Box::new(input.deep_clone(seen_tees)),
2441 metadata: metadata.clone(),
2442 },
2443 HydroNode::ReduceKeyedWatermark {
2444 f,
2445 input,
2446 watermark,
2447 metadata,
2448 } => HydroNode::ReduceKeyedWatermark {
2449 f: f.clone(),
2450 input: Box::new(input.deep_clone(seen_tees)),
2451 watermark: Box::new(watermark.deep_clone(seen_tees)),
2452 metadata: metadata.clone(),
2453 },
2454 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2455 f: f.clone(),
2456 input: Box::new(input.deep_clone(seen_tees)),
2457 metadata: metadata.clone(),
2458 },
2459 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2460 f: f.clone(),
2461 input: Box::new(input.deep_clone(seen_tees)),
2462 metadata: metadata.clone(),
2463 },
2464 HydroNode::Network {
2465 name,
2466 networking_info,
2467 serialize_fn,
2468 instantiate_fn,
2469 deserialize_fn,
2470 input,
2471 metadata,
2472 } => HydroNode::Network {
2473 name: name.clone(),
2474 networking_info: networking_info.clone(),
2475 serialize_fn: serialize_fn.clone(),
2476 instantiate_fn: instantiate_fn.clone(),
2477 deserialize_fn: deserialize_fn.clone(),
2478 input: Box::new(input.deep_clone(seen_tees)),
2479 metadata: metadata.clone(),
2480 },
2481 HydroNode::ExternalInput {
2482 from_external_key,
2483 from_port_id,
2484 from_many,
2485 codec_type,
2486 port_hint,
2487 instantiate_fn,
2488 deserialize_fn,
2489 metadata,
2490 } => HydroNode::ExternalInput {
2491 from_external_key: *from_external_key,
2492 from_port_id: *from_port_id,
2493 from_many: *from_many,
2494 codec_type: codec_type.clone(),
2495 port_hint: *port_hint,
2496 instantiate_fn: instantiate_fn.clone(),
2497 deserialize_fn: deserialize_fn.clone(),
2498 metadata: metadata.clone(),
2499 },
2500 HydroNode::Counter {
2501 tag,
2502 duration,
2503 prefix,
2504 input,
2505 metadata,
2506 } => HydroNode::Counter {
2507 tag: tag.clone(),
2508 duration: duration.clone(),
2509 prefix: prefix.clone(),
2510 input: Box::new(input.deep_clone(seen_tees)),
2511 metadata: metadata.clone(),
2512 },
2513 }
2514 }
2515
2516 #[cfg(feature = "build")]
2517 pub fn emit_core(
2518 &mut self,
2519 builders_or_callback: &mut BuildersOrCallback<
2520 impl FnMut(&mut HydroRoot, &mut usize),
2521 impl FnMut(&mut HydroNode, &mut usize),
2522 >,
2523 seen_tees: &mut SeenSharedNodes,
2524 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2525 next_stmt_id: &mut usize,
2526 ) -> syn::Ident {
2527 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2528
2529 self.transform_bottom_up(
2530 &mut |node: &mut HydroNode| {
2531 let out_location = node.metadata().location_id.clone();
2532 match node {
2533 HydroNode::Placeholder => {
2534 panic!()
2535 }
2536
2537 HydroNode::Cast { .. } => {
2538 match builders_or_callback {
2541 BuildersOrCallback::Builders(_) => {}
2542 BuildersOrCallback::Callback(_, node_callback) => {
2543 node_callback(node, next_stmt_id);
2544 }
2545 }
2546
2547 *next_stmt_id += 1;
2548 }
2550
2551 HydroNode::ObserveNonDet {
2552 inner,
2553 trusted,
2554 metadata,
2555 ..
2556 } => {
2557 let inner_ident = ident_stack.pop().unwrap();
2558
2559 let observe_ident =
2560 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2561
2562 match builders_or_callback {
2563 BuildersOrCallback::Builders(graph_builders) => {
2564 graph_builders.observe_nondet(
2565 *trusted,
2566 &inner.metadata().location_id,
2567 inner_ident,
2568 &inner.metadata().collection_kind,
2569 &observe_ident,
2570 &metadata.collection_kind,
2571 &metadata.op,
2572 );
2573 }
2574 BuildersOrCallback::Callback(_, node_callback) => {
2575 node_callback(node, next_stmt_id);
2576 }
2577 }
2578
2579 *next_stmt_id += 1;
2580
2581 ident_stack.push(observe_ident);
2582 }
2583
2584 HydroNode::Batch {
2585 inner, metadata, ..
2586 } => {
2587 let inner_ident = ident_stack.pop().unwrap();
2588
2589 let batch_ident =
2590 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2591
2592 match builders_or_callback {
2593 BuildersOrCallback::Builders(graph_builders) => {
2594 graph_builders.batch(
2595 inner_ident,
2596 &inner.metadata().location_id,
2597 &inner.metadata().collection_kind,
2598 &batch_ident,
2599 &out_location,
2600 &metadata.op,
2601 );
2602 }
2603 BuildersOrCallback::Callback(_, node_callback) => {
2604 node_callback(node, next_stmt_id);
2605 }
2606 }
2607
2608 *next_stmt_id += 1;
2609
2610 ident_stack.push(batch_ident);
2611 }
2612
2613 HydroNode::YieldConcat { inner, .. } => {
2614 let inner_ident = ident_stack.pop().unwrap();
2615
2616 let yield_ident =
2617 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2618
2619 match builders_or_callback {
2620 BuildersOrCallback::Builders(graph_builders) => {
2621 graph_builders.yield_from_tick(
2622 inner_ident,
2623 &inner.metadata().location_id,
2624 &inner.metadata().collection_kind,
2625 &yield_ident,
2626 &out_location,
2627 );
2628 }
2629 BuildersOrCallback::Callback(_, node_callback) => {
2630 node_callback(node, next_stmt_id);
2631 }
2632 }
2633
2634 *next_stmt_id += 1;
2635
2636 ident_stack.push(yield_ident);
2637 }
2638
2639 HydroNode::BeginAtomic { inner, metadata } => {
2640 let inner_ident = ident_stack.pop().unwrap();
2641
2642 let begin_ident =
2643 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2644
2645 match builders_or_callback {
2646 BuildersOrCallback::Builders(graph_builders) => {
2647 graph_builders.begin_atomic(
2648 inner_ident,
2649 &inner.metadata().location_id,
2650 &inner.metadata().collection_kind,
2651 &begin_ident,
2652 &out_location,
2653 &metadata.op,
2654 );
2655 }
2656 BuildersOrCallback::Callback(_, node_callback) => {
2657 node_callback(node, next_stmt_id);
2658 }
2659 }
2660
2661 *next_stmt_id += 1;
2662
2663 ident_stack.push(begin_ident);
2664 }
2665
2666 HydroNode::EndAtomic { inner, .. } => {
2667 let inner_ident = ident_stack.pop().unwrap();
2668
2669 let end_ident =
2670 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2671
2672 match builders_or_callback {
2673 BuildersOrCallback::Builders(graph_builders) => {
2674 graph_builders.end_atomic(
2675 inner_ident,
2676 &inner.metadata().location_id,
2677 &inner.metadata().collection_kind,
2678 &end_ident,
2679 );
2680 }
2681 BuildersOrCallback::Callback(_, node_callback) => {
2682 node_callback(node, next_stmt_id);
2683 }
2684 }
2685
2686 *next_stmt_id += 1;
2687
2688 ident_stack.push(end_ident);
2689 }
2690
2691 HydroNode::Source {
2692 source, metadata, ..
2693 } => {
2694 if let HydroSource::ExternalNetwork() = source {
2695 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2696 } else {
2697 let source_ident =
2698 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2699
2700 let source_stmt = match source {
2701 HydroSource::Stream(expr) => {
2702 debug_assert!(metadata.location_id.is_top_level());
2703 parse_quote! {
2704 #source_ident = source_stream(#expr);
2705 }
2706 }
2707
2708 HydroSource::ExternalNetwork() => {
2709 unreachable!()
2710 }
2711
2712 HydroSource::Iter(expr) => {
2713 if metadata.location_id.is_top_level() {
2714 parse_quote! {
2715 #source_ident = source_iter(#expr);
2716 }
2717 } else {
2718 parse_quote! {
2720 #source_ident = source_iter(#expr) -> persist::<'static>();
2721 }
2722 }
2723 }
2724
2725 HydroSource::Spin() => {
2726 debug_assert!(metadata.location_id.is_top_level());
2727 parse_quote! {
2728 #source_ident = spin();
2729 }
2730 }
2731
2732 HydroSource::ClusterMembers(target_loc, state) => {
2733 debug_assert!(metadata.location_id.is_top_level());
2734
2735 let members_tee_ident = syn::Ident::new(
2736 &format!(
2737 "__cluster_members_tee_{}_{}",
2738 metadata.location_id.root().key(),
2739 target_loc.key(),
2740 ),
2741 Span::call_site(),
2742 );
2743
2744 match state {
2745 ClusterMembersState::Stream(d) => {
2746 parse_quote! {
2747 #members_tee_ident = source_stream(#d) -> tee();
2748 #source_ident = #members_tee_ident;
2749 }
2750 },
2751 ClusterMembersState::Uninit => syn::parse_quote! {
2752 #source_ident = source_stream(DUMMY);
2753 },
2754 ClusterMembersState::Tee(..) => parse_quote! {
2755 #source_ident = #members_tee_ident;
2756 },
2757 }
2758 }
2759
2760 HydroSource::Embedded(ident) => {
2761 parse_quote! {
2762 #source_ident = source_stream(#ident);
2763 }
2764 }
2765
2766 HydroSource::EmbeddedSingleton(ident) => {
2767 parse_quote! {
2768 #source_ident = source_iter([#ident]);
2769 }
2770 }
2771 };
2772
2773 match builders_or_callback {
2774 BuildersOrCallback::Builders(graph_builders) => {
2775 let builder = graph_builders.get_dfir_mut(&out_location);
2776 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2777 }
2778 BuildersOrCallback::Callback(_, node_callback) => {
2779 node_callback(node, next_stmt_id);
2780 }
2781 }
2782
2783 *next_stmt_id += 1;
2784
2785 ident_stack.push(source_ident);
2786 }
2787 }
2788
2789 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2790 let source_ident =
2791 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2792
2793 match builders_or_callback {
2794 BuildersOrCallback::Builders(graph_builders) => {
2795 let builder = graph_builders.get_dfir_mut(&out_location);
2796
2797 if *first_tick_only {
2798 assert!(
2799 !metadata.location_id.is_top_level(),
2800 "first_tick_only SingletonSource must be inside a tick"
2801 );
2802 }
2803
2804 if *first_tick_only
2805 || (metadata.location_id.is_top_level()
2806 && metadata.collection_kind.is_bounded())
2807 {
2808 builder.add_dfir(
2809 parse_quote! {
2810 #source_ident = source_iter([#value]);
2811 },
2812 None,
2813 Some(&next_stmt_id.to_string()),
2814 );
2815 } else {
2816 builder.add_dfir(
2817 parse_quote! {
2818 #source_ident = source_iter([#value]) -> persist::<'static>();
2819 },
2820 None,
2821 Some(&next_stmt_id.to_string()),
2822 );
2823 }
2824 }
2825 BuildersOrCallback::Callback(_, node_callback) => {
2826 node_callback(node, next_stmt_id);
2827 }
2828 }
2829
2830 *next_stmt_id += 1;
2831
2832 ident_stack.push(source_ident);
2833 }
2834
2835 HydroNode::CycleSource { cycle_id, .. } => {
2836 let ident = cycle_id.as_ident();
2837
2838 match builders_or_callback {
2839 BuildersOrCallback::Builders(_) => {}
2840 BuildersOrCallback::Callback(_, node_callback) => {
2841 node_callback(node, next_stmt_id);
2842 }
2843 }
2844
2845 *next_stmt_id += 1;
2847
2848 ident_stack.push(ident);
2849 }
2850
2851 HydroNode::Tee { inner, .. } => {
2852 let ret_ident = if let Some(built_idents) =
2853 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2854 {
2855 match builders_or_callback {
2856 BuildersOrCallback::Builders(_) => {}
2857 BuildersOrCallback::Callback(_, node_callback) => {
2858 node_callback(node, next_stmt_id);
2859 }
2860 }
2861
2862 built_idents[0].clone()
2863 } else {
2864 let inner_ident = ident_stack.pop().unwrap();
2867
2868 let tee_ident =
2869 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2870
2871 built_tees.insert(
2872 inner.0.as_ref() as *const RefCell<HydroNode>,
2873 vec![tee_ident.clone()],
2874 );
2875
2876 match builders_or_callback {
2877 BuildersOrCallback::Builders(graph_builders) => {
2878 let builder = graph_builders.get_dfir_mut(&out_location);
2879 builder.add_dfir(
2880 parse_quote! {
2881 #tee_ident = #inner_ident -> tee();
2882 },
2883 None,
2884 Some(&next_stmt_id.to_string()),
2885 );
2886 }
2887 BuildersOrCallback::Callback(_, node_callback) => {
2888 node_callback(node, next_stmt_id);
2889 }
2890 }
2891
2892 tee_ident
2893 };
2894
2895 *next_stmt_id += 1;
2899 ident_stack.push(ret_ident);
2900 }
2901
2902 HydroNode::Partition {
2903 inner, f, is_true, ..
2904 } => {
2905 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2907 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2908 match builders_or_callback {
2909 BuildersOrCallback::Builders(_) => {}
2910 BuildersOrCallback::Callback(_, node_callback) => {
2911 node_callback(node, next_stmt_id);
2912 }
2913 }
2914
2915 let idx = if is_true { 0 } else { 1 };
2916 built_idents[idx].clone()
2917 } else {
2918 let inner_ident = ident_stack.pop().unwrap();
2921
2922 let partition_ident = syn::Ident::new(
2923 &format!("stream_{}_partition", *next_stmt_id),
2924 Span::call_site(),
2925 );
2926 let true_ident = syn::Ident::new(
2927 &format!("stream_{}_true", *next_stmt_id),
2928 Span::call_site(),
2929 );
2930 let false_ident = syn::Ident::new(
2931 &format!("stream_{}_false", *next_stmt_id),
2932 Span::call_site(),
2933 );
2934
2935 built_tees.insert(
2936 ptr,
2937 vec![true_ident.clone(), false_ident.clone()],
2938 );
2939
2940 match builders_or_callback {
2941 BuildersOrCallback::Builders(graph_builders) => {
2942 let builder = graph_builders.get_dfir_mut(&out_location);
2943 builder.add_dfir(
2944 parse_quote! {
2945 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2946 #true_ident = #partition_ident[0];
2947 #false_ident = #partition_ident[1];
2948 },
2949 None,
2950 Some(&next_stmt_id.to_string()),
2951 );
2952 }
2953 BuildersOrCallback::Callback(_, node_callback) => {
2954 node_callback(node, next_stmt_id);
2955 }
2956 }
2957
2958 if is_true { true_ident } else { false_ident }
2959 };
2960
2961 *next_stmt_id += 1;
2962 ident_stack.push(ret_ident);
2963 }
2964
2965 HydroNode::Chain { .. } => {
2966 let second_ident = ident_stack.pop().unwrap();
2968 let first_ident = ident_stack.pop().unwrap();
2969
2970 let chain_ident =
2971 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2972
2973 match builders_or_callback {
2974 BuildersOrCallback::Builders(graph_builders) => {
2975 let builder = graph_builders.get_dfir_mut(&out_location);
2976 builder.add_dfir(
2977 parse_quote! {
2978 #chain_ident = chain();
2979 #first_ident -> [0]#chain_ident;
2980 #second_ident -> [1]#chain_ident;
2981 },
2982 None,
2983 Some(&next_stmt_id.to_string()),
2984 );
2985 }
2986 BuildersOrCallback::Callback(_, node_callback) => {
2987 node_callback(node, next_stmt_id);
2988 }
2989 }
2990
2991 *next_stmt_id += 1;
2992
2993 ident_stack.push(chain_ident);
2994 }
2995
2996 HydroNode::ChainFirst { .. } => {
2997 let second_ident = ident_stack.pop().unwrap();
2998 let first_ident = ident_stack.pop().unwrap();
2999
3000 let chain_ident =
3001 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3002
3003 match builders_or_callback {
3004 BuildersOrCallback::Builders(graph_builders) => {
3005 let builder = graph_builders.get_dfir_mut(&out_location);
3006 builder.add_dfir(
3007 parse_quote! {
3008 #chain_ident = chain_first_n(1);
3009 #first_ident -> [0]#chain_ident;
3010 #second_ident -> [1]#chain_ident;
3011 },
3012 None,
3013 Some(&next_stmt_id.to_string()),
3014 );
3015 }
3016 BuildersOrCallback::Callback(_, node_callback) => {
3017 node_callback(node, next_stmt_id);
3018 }
3019 }
3020
3021 *next_stmt_id += 1;
3022
3023 ident_stack.push(chain_ident);
3024 }
3025
3026 HydroNode::CrossSingleton { right, .. } => {
3027 let right_ident = ident_stack.pop().unwrap();
3028 let left_ident = ident_stack.pop().unwrap();
3029
3030 let cross_ident =
3031 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3032
3033 match builders_or_callback {
3034 BuildersOrCallback::Builders(graph_builders) => {
3035 let builder = graph_builders.get_dfir_mut(&out_location);
3036
3037 if right.metadata().location_id.is_top_level()
3038 && right.metadata().collection_kind.is_bounded()
3039 {
3040 builder.add_dfir(
3041 parse_quote! {
3042 #cross_ident = cross_singleton();
3043 #left_ident -> [input]#cross_ident;
3044 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3045 },
3046 None,
3047 Some(&next_stmt_id.to_string()),
3048 );
3049 } else {
3050 builder.add_dfir(
3051 parse_quote! {
3052 #cross_ident = cross_singleton();
3053 #left_ident -> [input]#cross_ident;
3054 #right_ident -> [single]#cross_ident;
3055 },
3056 None,
3057 Some(&next_stmt_id.to_string()),
3058 );
3059 }
3060 }
3061 BuildersOrCallback::Callback(_, node_callback) => {
3062 node_callback(node, next_stmt_id);
3063 }
3064 }
3065
3066 *next_stmt_id += 1;
3067
3068 ident_stack.push(cross_ident);
3069 }
3070
3071 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3072 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3073 parse_quote!(cross_join_multiset)
3074 } else {
3075 parse_quote!(join_multiset)
3076 };
3077
3078 let (HydroNode::CrossProduct { left, right, .. }
3079 | HydroNode::Join { left, right, .. }) = node
3080 else {
3081 unreachable!()
3082 };
3083
3084 let is_top_level = left.metadata().location_id.is_top_level()
3085 && right.metadata().location_id.is_top_level();
3086 let left_lifetime = if left.metadata().location_id.is_top_level() {
3087 quote!('static)
3088 } else {
3089 quote!('tick)
3090 };
3091
3092 let right_lifetime = if right.metadata().location_id.is_top_level() {
3093 quote!('static)
3094 } else {
3095 quote!('tick)
3096 };
3097
3098 let right_ident = ident_stack.pop().unwrap();
3099 let left_ident = ident_stack.pop().unwrap();
3100
3101 let stream_ident =
3102 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3103
3104 match builders_or_callback {
3105 BuildersOrCallback::Builders(graph_builders) => {
3106 let builder = graph_builders.get_dfir_mut(&out_location);
3107 builder.add_dfir(
3108 if is_top_level {
3109 parse_quote! {
3112 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3113 #left_ident -> [0]#stream_ident;
3114 #right_ident -> [1]#stream_ident;
3115 }
3116 } else {
3117 parse_quote! {
3118 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3119 #left_ident -> [0]#stream_ident;
3120 #right_ident -> [1]#stream_ident;
3121 }
3122 }
3123 ,
3124 None,
3125 Some(&next_stmt_id.to_string()),
3126 );
3127 }
3128 BuildersOrCallback::Callback(_, node_callback) => {
3129 node_callback(node, next_stmt_id);
3130 }
3131 }
3132
3133 *next_stmt_id += 1;
3134
3135 ident_stack.push(stream_ident);
3136 }
3137
3138 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3139 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3140 parse_quote!(difference)
3141 } else {
3142 parse_quote!(anti_join)
3143 };
3144
3145 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3146 node
3147 else {
3148 unreachable!()
3149 };
3150
3151 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3152 quote!('static)
3153 } else {
3154 quote!('tick)
3155 };
3156
3157 let neg_ident = ident_stack.pop().unwrap();
3158 let pos_ident = ident_stack.pop().unwrap();
3159
3160 let stream_ident =
3161 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3162
3163 match builders_or_callback {
3164 BuildersOrCallback::Builders(graph_builders) => {
3165 let builder = graph_builders.get_dfir_mut(&out_location);
3166 builder.add_dfir(
3167 parse_quote! {
3168 #stream_ident = #operator::<'tick, #neg_lifetime>();
3169 #pos_ident -> [pos]#stream_ident;
3170 #neg_ident -> [neg]#stream_ident;
3171 },
3172 None,
3173 Some(&next_stmt_id.to_string()),
3174 );
3175 }
3176 BuildersOrCallback::Callback(_, node_callback) => {
3177 node_callback(node, next_stmt_id);
3178 }
3179 }
3180
3181 *next_stmt_id += 1;
3182
3183 ident_stack.push(stream_ident);
3184 }
3185
3186 HydroNode::ResolveFutures { .. } => {
3187 let input_ident = ident_stack.pop().unwrap();
3188
3189 let futures_ident =
3190 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3191
3192 match builders_or_callback {
3193 BuildersOrCallback::Builders(graph_builders) => {
3194 let builder = graph_builders.get_dfir_mut(&out_location);
3195 builder.add_dfir(
3196 parse_quote! {
3197 #futures_ident = #input_ident -> resolve_futures();
3198 },
3199 None,
3200 Some(&next_stmt_id.to_string()),
3201 );
3202 }
3203 BuildersOrCallback::Callback(_, node_callback) => {
3204 node_callback(node, next_stmt_id);
3205 }
3206 }
3207
3208 *next_stmt_id += 1;
3209
3210 ident_stack.push(futures_ident);
3211 }
3212
3213 HydroNode::ResolveFuturesBlocking { .. } => {
3214 let input_ident = ident_stack.pop().unwrap();
3215
3216 let futures_ident =
3217 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3218
3219 match builders_or_callback {
3220 BuildersOrCallback::Builders(graph_builders) => {
3221 let builder = graph_builders.get_dfir_mut(&out_location);
3222 builder.add_dfir(
3223 parse_quote! {
3224 #futures_ident = #input_ident -> resolve_futures_blocking();
3225 },
3226 None,
3227 Some(&next_stmt_id.to_string()),
3228 );
3229 }
3230 BuildersOrCallback::Callback(_, node_callback) => {
3231 node_callback(node, next_stmt_id);
3232 }
3233 }
3234
3235 *next_stmt_id += 1;
3236
3237 ident_stack.push(futures_ident);
3238 }
3239
3240 HydroNode::ResolveFuturesOrdered { .. } => {
3241 let input_ident = ident_stack.pop().unwrap();
3242
3243 let futures_ident =
3244 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3245
3246 match builders_or_callback {
3247 BuildersOrCallback::Builders(graph_builders) => {
3248 let builder = graph_builders.get_dfir_mut(&out_location);
3249 builder.add_dfir(
3250 parse_quote! {
3251 #futures_ident = #input_ident -> resolve_futures_ordered();
3252 },
3253 None,
3254 Some(&next_stmt_id.to_string()),
3255 );
3256 }
3257 BuildersOrCallback::Callback(_, node_callback) => {
3258 node_callback(node, next_stmt_id);
3259 }
3260 }
3261
3262 *next_stmt_id += 1;
3263
3264 ident_stack.push(futures_ident);
3265 }
3266
3267 HydroNode::Map { f, .. } => {
3268 let input_ident = ident_stack.pop().unwrap();
3269
3270 let map_ident =
3271 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3272
3273 match builders_or_callback {
3274 BuildersOrCallback::Builders(graph_builders) => {
3275 let builder = graph_builders.get_dfir_mut(&out_location);
3276 builder.add_dfir(
3277 parse_quote! {
3278 #map_ident = #input_ident -> map(#f);
3279 },
3280 None,
3281 Some(&next_stmt_id.to_string()),
3282 );
3283 }
3284 BuildersOrCallback::Callback(_, node_callback) => {
3285 node_callback(node, next_stmt_id);
3286 }
3287 }
3288
3289 *next_stmt_id += 1;
3290
3291 ident_stack.push(map_ident);
3292 }
3293
3294 HydroNode::FlatMap { f, .. } => {
3295 let input_ident = ident_stack.pop().unwrap();
3296
3297 let flat_map_ident =
3298 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3299
3300 match builders_or_callback {
3301 BuildersOrCallback::Builders(graph_builders) => {
3302 let builder = graph_builders.get_dfir_mut(&out_location);
3303 builder.add_dfir(
3304 parse_quote! {
3305 #flat_map_ident = #input_ident -> flat_map(#f);
3306 },
3307 None,
3308 Some(&next_stmt_id.to_string()),
3309 );
3310 }
3311 BuildersOrCallback::Callback(_, node_callback) => {
3312 node_callback(node, next_stmt_id);
3313 }
3314 }
3315
3316 *next_stmt_id += 1;
3317
3318 ident_stack.push(flat_map_ident);
3319 }
3320
3321 HydroNode::Filter { f, .. } => {
3322 let input_ident = ident_stack.pop().unwrap();
3323
3324 let filter_ident =
3325 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3326
3327 match builders_or_callback {
3328 BuildersOrCallback::Builders(graph_builders) => {
3329 let builder = graph_builders.get_dfir_mut(&out_location);
3330 builder.add_dfir(
3331 parse_quote! {
3332 #filter_ident = #input_ident -> filter(#f);
3333 },
3334 None,
3335 Some(&next_stmt_id.to_string()),
3336 );
3337 }
3338 BuildersOrCallback::Callback(_, node_callback) => {
3339 node_callback(node, next_stmt_id);
3340 }
3341 }
3342
3343 *next_stmt_id += 1;
3344
3345 ident_stack.push(filter_ident);
3346 }
3347
3348 HydroNode::FilterMap { f, .. } => {
3349 let input_ident = ident_stack.pop().unwrap();
3350
3351 let filter_map_ident =
3352 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3353
3354 match builders_or_callback {
3355 BuildersOrCallback::Builders(graph_builders) => {
3356 let builder = graph_builders.get_dfir_mut(&out_location);
3357 builder.add_dfir(
3358 parse_quote! {
3359 #filter_map_ident = #input_ident -> filter_map(#f);
3360 },
3361 None,
3362 Some(&next_stmt_id.to_string()),
3363 );
3364 }
3365 BuildersOrCallback::Callback(_, node_callback) => {
3366 node_callback(node, next_stmt_id);
3367 }
3368 }
3369
3370 *next_stmt_id += 1;
3371
3372 ident_stack.push(filter_map_ident);
3373 }
3374
3375 HydroNode::Sort { .. } => {
3376 let input_ident = ident_stack.pop().unwrap();
3377
3378 let sort_ident =
3379 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3380
3381 match builders_or_callback {
3382 BuildersOrCallback::Builders(graph_builders) => {
3383 let builder = graph_builders.get_dfir_mut(&out_location);
3384 builder.add_dfir(
3385 parse_quote! {
3386 #sort_ident = #input_ident -> sort();
3387 },
3388 None,
3389 Some(&next_stmt_id.to_string()),
3390 );
3391 }
3392 BuildersOrCallback::Callback(_, node_callback) => {
3393 node_callback(node, next_stmt_id);
3394 }
3395 }
3396
3397 *next_stmt_id += 1;
3398
3399 ident_stack.push(sort_ident);
3400 }
3401
3402 HydroNode::DeferTick { .. } => {
3403 let input_ident = ident_stack.pop().unwrap();
3404
3405 let defer_tick_ident =
3406 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3407
3408 match builders_or_callback {
3409 BuildersOrCallback::Builders(graph_builders) => {
3410 let builder = graph_builders.get_dfir_mut(&out_location);
3411 builder.add_dfir(
3412 parse_quote! {
3413 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3414 },
3415 None,
3416 Some(&next_stmt_id.to_string()),
3417 );
3418 }
3419 BuildersOrCallback::Callback(_, node_callback) => {
3420 node_callback(node, next_stmt_id);
3421 }
3422 }
3423
3424 *next_stmt_id += 1;
3425
3426 ident_stack.push(defer_tick_ident);
3427 }
3428
3429 HydroNode::Enumerate { input, .. } => {
3430 let input_ident = ident_stack.pop().unwrap();
3431
3432 let enumerate_ident =
3433 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3434
3435 match builders_or_callback {
3436 BuildersOrCallback::Builders(graph_builders) => {
3437 let builder = graph_builders.get_dfir_mut(&out_location);
3438 let lifetime = if input.metadata().location_id.is_top_level() {
3439 quote!('static)
3440 } else {
3441 quote!('tick)
3442 };
3443 builder.add_dfir(
3444 parse_quote! {
3445 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3446 },
3447 None,
3448 Some(&next_stmt_id.to_string()),
3449 );
3450 }
3451 BuildersOrCallback::Callback(_, node_callback) => {
3452 node_callback(node, next_stmt_id);
3453 }
3454 }
3455
3456 *next_stmt_id += 1;
3457
3458 ident_stack.push(enumerate_ident);
3459 }
3460
3461 HydroNode::Inspect { f, .. } => {
3462 let input_ident = ident_stack.pop().unwrap();
3463
3464 let inspect_ident =
3465 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3466
3467 match builders_or_callback {
3468 BuildersOrCallback::Builders(graph_builders) => {
3469 let builder = graph_builders.get_dfir_mut(&out_location);
3470 builder.add_dfir(
3471 parse_quote! {
3472 #inspect_ident = #input_ident -> inspect(#f);
3473 },
3474 None,
3475 Some(&next_stmt_id.to_string()),
3476 );
3477 }
3478 BuildersOrCallback::Callback(_, node_callback) => {
3479 node_callback(node, next_stmt_id);
3480 }
3481 }
3482
3483 *next_stmt_id += 1;
3484
3485 ident_stack.push(inspect_ident);
3486 }
3487
3488 HydroNode::Unique { input, .. } => {
3489 let input_ident = ident_stack.pop().unwrap();
3490
3491 let unique_ident =
3492 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3493
3494 match builders_or_callback {
3495 BuildersOrCallback::Builders(graph_builders) => {
3496 let builder = graph_builders.get_dfir_mut(&out_location);
3497 let lifetime = if input.metadata().location_id.is_top_level() {
3498 quote!('static)
3499 } else {
3500 quote!('tick)
3501 };
3502
3503 builder.add_dfir(
3504 parse_quote! {
3505 #unique_ident = #input_ident -> unique::<#lifetime>();
3506 },
3507 None,
3508 Some(&next_stmt_id.to_string()),
3509 );
3510 }
3511 BuildersOrCallback::Callback(_, node_callback) => {
3512 node_callback(node, next_stmt_id);
3513 }
3514 }
3515
3516 *next_stmt_id += 1;
3517
3518 ident_stack.push(unique_ident);
3519 }
3520
3521 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3522 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3523 if input.metadata().location_id.is_top_level()
3524 && input.metadata().collection_kind.is_bounded()
3525 {
3526 parse_quote!(fold_no_replay)
3527 } else {
3528 parse_quote!(fold)
3529 }
3530 } else if matches!(node, HydroNode::Scan { .. }) {
3531 parse_quote!(scan)
3532 } else if let HydroNode::FoldKeyed { input, .. } = node {
3533 if input.metadata().location_id.is_top_level()
3534 && input.metadata().collection_kind.is_bounded()
3535 {
3536 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3537 } else {
3538 parse_quote!(fold_keyed)
3539 }
3540 } else {
3541 unreachable!()
3542 };
3543
3544 let (HydroNode::Fold { input, .. }
3545 | HydroNode::FoldKeyed { input, .. }
3546 | HydroNode::Scan { input, .. }) = node
3547 else {
3548 unreachable!()
3549 };
3550
3551 let lifetime = if input.metadata().location_id.is_top_level() {
3552 quote!('static)
3553 } else {
3554 quote!('tick)
3555 };
3556
3557 let input_ident = ident_stack.pop().unwrap();
3558
3559 let (HydroNode::Fold { init, acc, .. }
3560 | HydroNode::FoldKeyed { init, acc, .. }
3561 | HydroNode::Scan { init, acc, .. }) = &*node
3562 else {
3563 unreachable!()
3564 };
3565
3566 let fold_ident =
3567 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3568
3569 match builders_or_callback {
3570 BuildersOrCallback::Builders(graph_builders) => {
3571 if matches!(node, HydroNode::Fold { .. })
3572 && node.metadata().location_id.is_top_level()
3573 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3574 && graph_builders.singleton_intermediates()
3575 && !node.metadata().collection_kind.is_bounded()
3576 {
3577 let builder = graph_builders.get_dfir_mut(&out_location);
3578
3579 let acc: syn::Expr = parse_quote!({
3580 let mut __inner = #acc;
3581 move |__state, __value| {
3582 __inner(__state, __value);
3583 Some(__state.clone())
3584 }
3585 });
3586
3587 builder.add_dfir(
3588 parse_quote! {
3589 source_iter([(#init)()]) -> [0]#fold_ident;
3590 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3591 #fold_ident = chain();
3592 },
3593 None,
3594 Some(&next_stmt_id.to_string()),
3595 );
3596 } else if matches!(node, HydroNode::FoldKeyed { .. })
3597 && node.metadata().location_id.is_top_level()
3598 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3599 && graph_builders.singleton_intermediates()
3600 && !node.metadata().collection_kind.is_bounded()
3601 {
3602 let builder = graph_builders.get_dfir_mut(&out_location);
3603
3604 let acc: syn::Expr = parse_quote!({
3605 let mut __init = #init;
3606 let mut __inner = #acc;
3607 move |__state, __kv: (_, _)| {
3608 let __state = __state
3610 .entry(::std::clone::Clone::clone(&__kv.0))
3611 .or_insert_with(|| (__init)());
3612 __inner(__state, __kv.1);
3613 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3614 }
3615 });
3616
3617 builder.add_dfir(
3618 parse_quote! {
3619 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3620 },
3621 None,
3622 Some(&next_stmt_id.to_string()),
3623 );
3624 } else {
3625 let builder = graph_builders.get_dfir_mut(&out_location);
3626 builder.add_dfir(
3627 parse_quote! {
3628 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3629 },
3630 None,
3631 Some(&next_stmt_id.to_string()),
3632 );
3633 }
3634 }
3635 BuildersOrCallback::Callback(_, node_callback) => {
3636 node_callback(node, next_stmt_id);
3637 }
3638 }
3639
3640 *next_stmt_id += 1;
3641
3642 ident_stack.push(fold_ident);
3643 }
3644
3645 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3646 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3647 if input.metadata().location_id.is_top_level()
3648 && input.metadata().collection_kind.is_bounded()
3649 {
3650 parse_quote!(reduce_no_replay)
3651 } else {
3652 parse_quote!(reduce)
3653 }
3654 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3655 if input.metadata().location_id.is_top_level()
3656 && input.metadata().collection_kind.is_bounded()
3657 {
3658 todo!(
3659 "Calling keyed reduce on a top-level bounded collection is not supported"
3660 )
3661 } else {
3662 parse_quote!(reduce_keyed)
3663 }
3664 } else {
3665 unreachable!()
3666 };
3667
3668 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3669 else {
3670 unreachable!()
3671 };
3672
3673 let lifetime = if input.metadata().location_id.is_top_level() {
3674 quote!('static)
3675 } else {
3676 quote!('tick)
3677 };
3678
3679 let input_ident = ident_stack.pop().unwrap();
3680
3681 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3682 else {
3683 unreachable!()
3684 };
3685
3686 let reduce_ident =
3687 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3688
3689 match builders_or_callback {
3690 BuildersOrCallback::Builders(graph_builders) => {
3691 if matches!(node, HydroNode::Reduce { .. })
3692 && node.metadata().location_id.is_top_level()
3693 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3694 && graph_builders.singleton_intermediates()
3695 && !node.metadata().collection_kind.is_bounded()
3696 {
3697 todo!(
3698 "Reduce with optional intermediates is not yet supported in simulator"
3699 );
3700 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3701 && node.metadata().location_id.is_top_level()
3702 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3703 && graph_builders.singleton_intermediates()
3704 && !node.metadata().collection_kind.is_bounded()
3705 {
3706 todo!(
3707 "Reduce keyed with optional intermediates is not yet supported in simulator"
3708 );
3709 } else {
3710 let builder = graph_builders.get_dfir_mut(&out_location);
3711 builder.add_dfir(
3712 parse_quote! {
3713 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3714 },
3715 None,
3716 Some(&next_stmt_id.to_string()),
3717 );
3718 }
3719 }
3720 BuildersOrCallback::Callback(_, node_callback) => {
3721 node_callback(node, next_stmt_id);
3722 }
3723 }
3724
3725 *next_stmt_id += 1;
3726
3727 ident_stack.push(reduce_ident);
3728 }
3729
3730 HydroNode::ReduceKeyedWatermark {
3731 f,
3732 input,
3733 metadata,
3734 ..
3735 } => {
3736 let lifetime = if input.metadata().location_id.is_top_level() {
3737 quote!('static)
3738 } else {
3739 quote!('tick)
3740 };
3741
3742 let watermark_ident = ident_stack.pop().unwrap();
3744 let input_ident = ident_stack.pop().unwrap();
3745
3746 let chain_ident = syn::Ident::new(
3747 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3748 Span::call_site(),
3749 );
3750
3751 let fold_ident =
3752 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3753
3754 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3755 && input.metadata().collection_kind.is_bounded()
3756 {
3757 parse_quote!(fold_no_replay)
3758 } else {
3759 parse_quote!(fold)
3760 };
3761
3762 match builders_or_callback {
3763 BuildersOrCallback::Builders(graph_builders) => {
3764 if metadata.location_id.is_top_level()
3765 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3766 && graph_builders.singleton_intermediates()
3767 && !metadata.collection_kind.is_bounded()
3768 {
3769 todo!(
3770 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3771 )
3772 } else {
3773 let builder = graph_builders.get_dfir_mut(&out_location);
3774 builder.add_dfir(
3775 parse_quote! {
3776 #chain_ident = chain();
3777 #input_ident
3778 -> map(|x| (Some(x), None))
3779 -> [0]#chain_ident;
3780 #watermark_ident
3781 -> map(|watermark| (None, Some(watermark)))
3782 -> [1]#chain_ident;
3783
3784 #fold_ident = #chain_ident
3785 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3786 let __reduce_keyed_fn = #f;
3787 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3788 if let Some((k, v)) = opt_payload {
3789 if let Some(curr_watermark) = *opt_curr_watermark {
3790 if k < curr_watermark {
3791 return;
3792 }
3793 }
3794 match map.entry(k) {
3795 ::std::collections::hash_map::Entry::Vacant(e) => {
3796 e.insert(v);
3797 }
3798 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3799 __reduce_keyed_fn(e.get_mut(), v);
3800 }
3801 }
3802 } else {
3803 let watermark = opt_watermark.unwrap();
3804 if let Some(curr_watermark) = *opt_curr_watermark {
3805 if watermark <= curr_watermark {
3806 return;
3807 }
3808 }
3809 *opt_curr_watermark = opt_watermark;
3810 map.retain(|k, _| *k >= watermark);
3811 }
3812 }
3813 })
3814 -> flat_map(|(map, _curr_watermark)| map);
3815 },
3816 None,
3817 Some(&next_stmt_id.to_string()),
3818 );
3819 }
3820 }
3821 BuildersOrCallback::Callback(_, node_callback) => {
3822 node_callback(node, next_stmt_id);
3823 }
3824 }
3825
3826 *next_stmt_id += 1;
3827
3828 ident_stack.push(fold_ident);
3829 }
3830
3831 HydroNode::Network {
3832 networking_info,
3833 serialize_fn: serialize_pipeline,
3834 instantiate_fn,
3835 deserialize_fn: deserialize_pipeline,
3836 input,
3837 ..
3838 } => {
3839 let input_ident = ident_stack.pop().unwrap();
3840
3841 let receiver_stream_ident =
3842 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3843
3844 match builders_or_callback {
3845 BuildersOrCallback::Builders(graph_builders) => {
3846 let (sink_expr, source_expr) = match instantiate_fn {
3847 DebugInstantiate::Building => (
3848 syn::parse_quote!(DUMMY_SINK),
3849 syn::parse_quote!(DUMMY_SOURCE),
3850 ),
3851
3852 DebugInstantiate::Finalized(finalized) => {
3853 (finalized.sink.clone(), finalized.source.clone())
3854 }
3855 };
3856
3857 graph_builders.create_network(
3858 &input.metadata().location_id,
3859 &out_location,
3860 input_ident,
3861 &receiver_stream_ident,
3862 serialize_pipeline.as_ref(),
3863 sink_expr,
3864 source_expr,
3865 deserialize_pipeline.as_ref(),
3866 *next_stmt_id,
3867 networking_info,
3868 );
3869 }
3870 BuildersOrCallback::Callback(_, node_callback) => {
3871 node_callback(node, next_stmt_id);
3872 }
3873 }
3874
3875 *next_stmt_id += 1;
3876
3877 ident_stack.push(receiver_stream_ident);
3878 }
3879
3880 HydroNode::ExternalInput {
3881 instantiate_fn,
3882 deserialize_fn: deserialize_pipeline,
3883 ..
3884 } => {
3885 let receiver_stream_ident =
3886 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3887
3888 match builders_or_callback {
3889 BuildersOrCallback::Builders(graph_builders) => {
3890 let (_, source_expr) = match instantiate_fn {
3891 DebugInstantiate::Building => (
3892 syn::parse_quote!(DUMMY_SINK),
3893 syn::parse_quote!(DUMMY_SOURCE),
3894 ),
3895
3896 DebugInstantiate::Finalized(finalized) => {
3897 (finalized.sink.clone(), finalized.source.clone())
3898 }
3899 };
3900
3901 graph_builders.create_external_source(
3902 &out_location,
3903 source_expr,
3904 &receiver_stream_ident,
3905 deserialize_pipeline.as_ref(),
3906 *next_stmt_id,
3907 );
3908 }
3909 BuildersOrCallback::Callback(_, node_callback) => {
3910 node_callback(node, next_stmt_id);
3911 }
3912 }
3913
3914 *next_stmt_id += 1;
3915
3916 ident_stack.push(receiver_stream_ident);
3917 }
3918
3919 HydroNode::Counter {
3920 tag,
3921 duration,
3922 prefix,
3923 ..
3924 } => {
3925 let input_ident = ident_stack.pop().unwrap();
3926
3927 let counter_ident =
3928 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3929
3930 match builders_or_callback {
3931 BuildersOrCallback::Builders(graph_builders) => {
3932 let arg = format!("{}({})", prefix, tag);
3933 let builder = graph_builders.get_dfir_mut(&out_location);
3934 builder.add_dfir(
3935 parse_quote! {
3936 #counter_ident = #input_ident -> _counter(#arg, #duration);
3937 },
3938 None,
3939 Some(&next_stmt_id.to_string()),
3940 );
3941 }
3942 BuildersOrCallback::Callback(_, node_callback) => {
3943 node_callback(node, next_stmt_id);
3944 }
3945 }
3946
3947 *next_stmt_id += 1;
3948
3949 ident_stack.push(counter_ident);
3950 }
3951 }
3952 },
3953 seen_tees,
3954 false,
3955 );
3956
3957 ident_stack
3958 .pop()
3959 .expect("ident_stack should have exactly one element after traversal")
3960 }
3961
3962 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3963 match self {
3964 HydroNode::Placeholder => {
3965 panic!()
3966 }
3967 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3968 HydroNode::Source { source, .. } => match source {
3969 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3970 HydroSource::ExternalNetwork()
3971 | HydroSource::Spin()
3972 | HydroSource::ClusterMembers(_, _)
3973 | HydroSource::Embedded(_)
3974 | HydroSource::EmbeddedSingleton(_) => {} },
3976 HydroNode::SingletonSource { value, .. } => {
3977 transform(value);
3978 }
3979 HydroNode::CycleSource { .. }
3980 | HydroNode::Tee { .. }
3981 | HydroNode::YieldConcat { .. }
3982 | HydroNode::BeginAtomic { .. }
3983 | HydroNode::EndAtomic { .. }
3984 | HydroNode::Batch { .. }
3985 | HydroNode::Chain { .. }
3986 | HydroNode::ChainFirst { .. }
3987 | HydroNode::CrossProduct { .. }
3988 | HydroNode::CrossSingleton { .. }
3989 | HydroNode::ResolveFutures { .. }
3990 | HydroNode::ResolveFuturesBlocking { .. }
3991 | HydroNode::ResolveFuturesOrdered { .. }
3992 | HydroNode::Join { .. }
3993 | HydroNode::Difference { .. }
3994 | HydroNode::AntiJoin { .. }
3995 | HydroNode::DeferTick { .. }
3996 | HydroNode::Enumerate { .. }
3997 | HydroNode::Unique { .. }
3998 | HydroNode::Sort { .. } => {}
3999 HydroNode::Map { f, .. }
4000 | HydroNode::FlatMap { f, .. }
4001 | HydroNode::Filter { f, .. }
4002 | HydroNode::FilterMap { f, .. }
4003 | HydroNode::Inspect { f, .. }
4004 | HydroNode::Partition { f, .. }
4005 | HydroNode::Reduce { f, .. }
4006 | HydroNode::ReduceKeyed { f, .. }
4007 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4008 transform(f);
4009 }
4010 HydroNode::Fold { init, acc, .. }
4011 | HydroNode::Scan { init, acc, .. }
4012 | HydroNode::FoldKeyed { init, acc, .. } => {
4013 transform(init);
4014 transform(acc);
4015 }
4016 HydroNode::Network {
4017 serialize_fn,
4018 deserialize_fn,
4019 ..
4020 } => {
4021 if let Some(serialize_fn) = serialize_fn {
4022 transform(serialize_fn);
4023 }
4024 if let Some(deserialize_fn) = deserialize_fn {
4025 transform(deserialize_fn);
4026 }
4027 }
4028 HydroNode::ExternalInput { deserialize_fn, .. } => {
4029 if let Some(deserialize_fn) = deserialize_fn {
4030 transform(deserialize_fn);
4031 }
4032 }
4033 HydroNode::Counter { duration, .. } => {
4034 transform(duration);
4035 }
4036 }
4037 }
4038
4039 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4040 &self.metadata().op
4041 }
4042
4043 pub fn metadata(&self) -> &HydroIrMetadata {
4044 match self {
4045 HydroNode::Placeholder => {
4046 panic!()
4047 }
4048 HydroNode::Cast { metadata, .. } => metadata,
4049 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4050 HydroNode::Source { metadata, .. } => metadata,
4051 HydroNode::SingletonSource { metadata, .. } => metadata,
4052 HydroNode::CycleSource { metadata, .. } => metadata,
4053 HydroNode::Tee { metadata, .. } => metadata,
4054 HydroNode::Partition { metadata, .. } => metadata,
4055 HydroNode::YieldConcat { metadata, .. } => metadata,
4056 HydroNode::BeginAtomic { metadata, .. } => metadata,
4057 HydroNode::EndAtomic { metadata, .. } => metadata,
4058 HydroNode::Batch { metadata, .. } => metadata,
4059 HydroNode::Chain { metadata, .. } => metadata,
4060 HydroNode::ChainFirst { metadata, .. } => metadata,
4061 HydroNode::CrossProduct { metadata, .. } => metadata,
4062 HydroNode::CrossSingleton { metadata, .. } => metadata,
4063 HydroNode::Join { metadata, .. } => metadata,
4064 HydroNode::Difference { metadata, .. } => metadata,
4065 HydroNode::AntiJoin { metadata, .. } => metadata,
4066 HydroNode::ResolveFutures { metadata, .. } => metadata,
4067 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4068 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4069 HydroNode::Map { metadata, .. } => metadata,
4070 HydroNode::FlatMap { metadata, .. } => metadata,
4071 HydroNode::Filter { metadata, .. } => metadata,
4072 HydroNode::FilterMap { metadata, .. } => metadata,
4073 HydroNode::DeferTick { metadata, .. } => metadata,
4074 HydroNode::Enumerate { metadata, .. } => metadata,
4075 HydroNode::Inspect { metadata, .. } => metadata,
4076 HydroNode::Unique { metadata, .. } => metadata,
4077 HydroNode::Sort { metadata, .. } => metadata,
4078 HydroNode::Scan { metadata, .. } => metadata,
4079 HydroNode::Fold { metadata, .. } => metadata,
4080 HydroNode::FoldKeyed { metadata, .. } => metadata,
4081 HydroNode::Reduce { metadata, .. } => metadata,
4082 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4083 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4084 HydroNode::ExternalInput { metadata, .. } => metadata,
4085 HydroNode::Network { metadata, .. } => metadata,
4086 HydroNode::Counter { metadata, .. } => metadata,
4087 }
4088 }
4089
4090 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4091 &mut self.metadata_mut().op
4092 }
4093
4094 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4095 match self {
4096 HydroNode::Placeholder => {
4097 panic!()
4098 }
4099 HydroNode::Cast { metadata, .. } => metadata,
4100 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4101 HydroNode::Source { metadata, .. } => metadata,
4102 HydroNode::SingletonSource { metadata, .. } => metadata,
4103 HydroNode::CycleSource { metadata, .. } => metadata,
4104 HydroNode::Tee { metadata, .. } => metadata,
4105 HydroNode::Partition { metadata, .. } => metadata,
4106 HydroNode::YieldConcat { metadata, .. } => metadata,
4107 HydroNode::BeginAtomic { metadata, .. } => metadata,
4108 HydroNode::EndAtomic { metadata, .. } => metadata,
4109 HydroNode::Batch { metadata, .. } => metadata,
4110 HydroNode::Chain { metadata, .. } => metadata,
4111 HydroNode::ChainFirst { metadata, .. } => metadata,
4112 HydroNode::CrossProduct { metadata, .. } => metadata,
4113 HydroNode::CrossSingleton { metadata, .. } => metadata,
4114 HydroNode::Join { metadata, .. } => metadata,
4115 HydroNode::Difference { metadata, .. } => metadata,
4116 HydroNode::AntiJoin { metadata, .. } => metadata,
4117 HydroNode::ResolveFutures { metadata, .. } => metadata,
4118 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4119 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4120 HydroNode::Map { metadata, .. } => metadata,
4121 HydroNode::FlatMap { metadata, .. } => metadata,
4122 HydroNode::Filter { metadata, .. } => metadata,
4123 HydroNode::FilterMap { metadata, .. } => metadata,
4124 HydroNode::DeferTick { metadata, .. } => metadata,
4125 HydroNode::Enumerate { metadata, .. } => metadata,
4126 HydroNode::Inspect { metadata, .. } => metadata,
4127 HydroNode::Unique { metadata, .. } => metadata,
4128 HydroNode::Sort { metadata, .. } => metadata,
4129 HydroNode::Scan { metadata, .. } => metadata,
4130 HydroNode::Fold { metadata, .. } => metadata,
4131 HydroNode::FoldKeyed { metadata, .. } => metadata,
4132 HydroNode::Reduce { metadata, .. } => metadata,
4133 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4134 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4135 HydroNode::ExternalInput { metadata, .. } => metadata,
4136 HydroNode::Network { metadata, .. } => metadata,
4137 HydroNode::Counter { metadata, .. } => metadata,
4138 }
4139 }
4140
4141 pub fn input(&self) -> Vec<&HydroNode> {
4142 match self {
4143 HydroNode::Placeholder => {
4144 panic!()
4145 }
4146 HydroNode::Source { .. }
4147 | HydroNode::SingletonSource { .. }
4148 | HydroNode::ExternalInput { .. }
4149 | HydroNode::CycleSource { .. }
4150 | HydroNode::Tee { .. }
4151 | HydroNode::Partition { .. } => {
4152 vec![]
4154 }
4155 HydroNode::Cast { inner, .. }
4156 | HydroNode::ObserveNonDet { inner, .. }
4157 | HydroNode::YieldConcat { inner, .. }
4158 | HydroNode::BeginAtomic { inner, .. }
4159 | HydroNode::EndAtomic { inner, .. }
4160 | HydroNode::Batch { inner, .. } => {
4161 vec![inner]
4162 }
4163 HydroNode::Chain { first, second, .. } => {
4164 vec![first, second]
4165 }
4166 HydroNode::ChainFirst { first, second, .. } => {
4167 vec![first, second]
4168 }
4169 HydroNode::CrossProduct { left, right, .. }
4170 | HydroNode::CrossSingleton { left, right, .. }
4171 | HydroNode::Join { left, right, .. } => {
4172 vec![left, right]
4173 }
4174 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4175 vec![pos, neg]
4176 }
4177 HydroNode::Map { input, .. }
4178 | HydroNode::FlatMap { input, .. }
4179 | HydroNode::Filter { input, .. }
4180 | HydroNode::FilterMap { input, .. }
4181 | HydroNode::Sort { input, .. }
4182 | HydroNode::DeferTick { input, .. }
4183 | HydroNode::Enumerate { input, .. }
4184 | HydroNode::Inspect { input, .. }
4185 | HydroNode::Unique { input, .. }
4186 | HydroNode::Network { input, .. }
4187 | HydroNode::Counter { input, .. }
4188 | HydroNode::ResolveFutures { input, .. }
4189 | HydroNode::ResolveFuturesBlocking { input, .. }
4190 | HydroNode::ResolveFuturesOrdered { input, .. }
4191 | HydroNode::Fold { input, .. }
4192 | HydroNode::FoldKeyed { input, .. }
4193 | HydroNode::Reduce { input, .. }
4194 | HydroNode::ReduceKeyed { input, .. }
4195 | HydroNode::Scan { input, .. } => {
4196 vec![input]
4197 }
4198 HydroNode::ReduceKeyedWatermark {
4199 input, watermark, ..
4200 } => {
4201 vec![input, watermark]
4202 }
4203 }
4204 }
4205
4206 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4207 self.input()
4208 .iter()
4209 .map(|input_node| input_node.metadata())
4210 .collect()
4211 }
4212
4213 pub fn is_shared_with_others(&self) -> bool {
4217 match self {
4218 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4219 Rc::strong_count(&inner.0) > 1
4220 }
4221 _ => false,
4222 }
4223 }
4224
4225 pub fn print_root(&self) -> String {
4226 match self {
4227 HydroNode::Placeholder => {
4228 panic!()
4229 }
4230 HydroNode::Cast { .. } => "Cast()".to_owned(),
4231 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4232 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4233 HydroNode::SingletonSource {
4234 value,
4235 first_tick_only,
4236 ..
4237 } => format!(
4238 "SingletonSource({:?}, first_tick_only={})",
4239 value, first_tick_only
4240 ),
4241 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4242 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4243 HydroNode::Partition { f, is_true, .. } => {
4244 format!("Partition({:?}, is_true={})", f, is_true)
4245 }
4246 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4247 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4248 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4249 HydroNode::Batch { .. } => "Batch()".to_owned(),
4250 HydroNode::Chain { first, second, .. } => {
4251 format!("Chain({}, {})", first.print_root(), second.print_root())
4252 }
4253 HydroNode::ChainFirst { first, second, .. } => {
4254 format!(
4255 "ChainFirst({}, {})",
4256 first.print_root(),
4257 second.print_root()
4258 )
4259 }
4260 HydroNode::CrossProduct { left, right, .. } => {
4261 format!(
4262 "CrossProduct({}, {})",
4263 left.print_root(),
4264 right.print_root()
4265 )
4266 }
4267 HydroNode::CrossSingleton { left, right, .. } => {
4268 format!(
4269 "CrossSingleton({}, {})",
4270 left.print_root(),
4271 right.print_root()
4272 )
4273 }
4274 HydroNode::Join { left, right, .. } => {
4275 format!("Join({}, {})", left.print_root(), right.print_root())
4276 }
4277 HydroNode::Difference { pos, neg, .. } => {
4278 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4279 }
4280 HydroNode::AntiJoin { pos, neg, .. } => {
4281 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4282 }
4283 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4284 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4285 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4286 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4287 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4288 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4289 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4290 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4291 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4292 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4293 HydroNode::Unique { .. } => "Unique()".to_owned(),
4294 HydroNode::Sort { .. } => "Sort()".to_owned(),
4295 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4296 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4297 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4298 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4299 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4300 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4301 HydroNode::Network { .. } => "Network()".to_owned(),
4302 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4303 HydroNode::Counter { tag, duration, .. } => {
4304 format!("Counter({:?}, {:?})", tag, duration)
4305 }
4306 }
4307 }
4308}
4309
4310#[cfg(feature = "build")]
4311fn instantiate_network<'a, D>(
4312 env: &mut D::InstantiateEnv,
4313 from_location: &LocationId,
4314 to_location: &LocationId,
4315 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4316 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4317 name: Option<&str>,
4318 networking_info: &crate::networking::NetworkingInfo,
4319) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4320where
4321 D: Deploy<'a>,
4322{
4323 let ((sink, source), connect_fn) = match (from_location, to_location) {
4324 (&LocationId::Process(from), &LocationId::Process(to)) => {
4325 let from_node = processes
4326 .get(from)
4327 .unwrap_or_else(|| {
4328 panic!("A process used in the graph was not instantiated: {}", from)
4329 })
4330 .clone();
4331 let to_node = processes
4332 .get(to)
4333 .unwrap_or_else(|| {
4334 panic!("A process used in the graph was not instantiated: {}", to)
4335 })
4336 .clone();
4337
4338 let sink_port = from_node.next_port();
4339 let source_port = to_node.next_port();
4340
4341 (
4342 D::o2o_sink_source(
4343 env,
4344 &from_node,
4345 &sink_port,
4346 &to_node,
4347 &source_port,
4348 name,
4349 networking_info,
4350 ),
4351 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4352 )
4353 }
4354 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4355 let from_node = processes
4356 .get(from)
4357 .unwrap_or_else(|| {
4358 panic!("A process used in the graph was not instantiated: {}", from)
4359 })
4360 .clone();
4361 let to_node = clusters
4362 .get(to)
4363 .unwrap_or_else(|| {
4364 panic!("A cluster used in the graph was not instantiated: {}", to)
4365 })
4366 .clone();
4367
4368 let sink_port = from_node.next_port();
4369 let source_port = to_node.next_port();
4370
4371 (
4372 D::o2m_sink_source(
4373 env,
4374 &from_node,
4375 &sink_port,
4376 &to_node,
4377 &source_port,
4378 name,
4379 networking_info,
4380 ),
4381 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4382 )
4383 }
4384 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4385 let from_node = clusters
4386 .get(from)
4387 .unwrap_or_else(|| {
4388 panic!("A cluster used in the graph was not instantiated: {}", from)
4389 })
4390 .clone();
4391 let to_node = processes
4392 .get(to)
4393 .unwrap_or_else(|| {
4394 panic!("A process used in the graph was not instantiated: {}", to)
4395 })
4396 .clone();
4397
4398 let sink_port = from_node.next_port();
4399 let source_port = to_node.next_port();
4400
4401 (
4402 D::m2o_sink_source(
4403 env,
4404 &from_node,
4405 &sink_port,
4406 &to_node,
4407 &source_port,
4408 name,
4409 networking_info,
4410 ),
4411 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4412 )
4413 }
4414 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4415 let from_node = clusters
4416 .get(from)
4417 .unwrap_or_else(|| {
4418 panic!("A cluster used in the graph was not instantiated: {}", from)
4419 })
4420 .clone();
4421 let to_node = clusters
4422 .get(to)
4423 .unwrap_or_else(|| {
4424 panic!("A cluster used in the graph was not instantiated: {}", to)
4425 })
4426 .clone();
4427
4428 let sink_port = from_node.next_port();
4429 let source_port = to_node.next_port();
4430
4431 (
4432 D::m2m_sink_source(
4433 env,
4434 &from_node,
4435 &sink_port,
4436 &to_node,
4437 &source_port,
4438 name,
4439 networking_info,
4440 ),
4441 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4442 )
4443 }
4444 (LocationId::Tick(_, _), _) => panic!(),
4445 (_, LocationId::Tick(_, _)) => panic!(),
4446 (LocationId::Atomic(_), _) => panic!(),
4447 (_, LocationId::Atomic(_)) => panic!(),
4448 };
4449 (sink, source, connect_fn)
4450}
4451
4452#[cfg(test)]
4453mod test {
4454 use std::mem::size_of;
4455
4456 use stageleft::{QuotedWithContext, q};
4457
4458 use super::*;
4459
4460 #[test]
4461 #[cfg_attr(
4462 not(feature = "build"),
4463 ignore = "expects inclusion of feature-gated fields"
4464 )]
4465 fn hydro_node_size() {
4466 assert_eq!(size_of::<HydroNode>(), 248);
4467 }
4468
4469 #[test]
4470 #[cfg_attr(
4471 not(feature = "build"),
4472 ignore = "expects inclusion of feature-gated fields"
4473 )]
4474 fn hydro_root_size() {
4475 assert_eq!(size_of::<HydroRoot>(), 136);
4476 }
4477
4478 #[test]
4479 fn test_simplify_q_macro_basic() {
4480 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4482 let result = simplify_q_macro(simple_expr.clone());
4483 assert_eq!(result, simple_expr);
4484 }
4485
4486 #[test]
4487 fn test_simplify_q_macro_actual_stageleft_call() {
4488 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4490 let result = simplify_q_macro(stageleft_call);
4491 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4494 }
4495
4496 #[test]
4497 fn test_closure_no_pipe_at_start() {
4498 let stageleft_call = q!({
4500 let foo = 123;
4501 move |b: usize| b + foo
4502 })
4503 .splice_fn1_ctx(&());
4504 let result = simplify_q_macro(stageleft_call);
4505 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4506 }
4507}