Skip to main content

hydro_lang/deploy/
deploy_graph.rs

1//! Deployment backend for Hydro that uses [`hydro_deploy`] to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31    ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34    HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41/// Deployment backend that uses [`hydro_deploy`] for provisioning and launching.
42///
43/// Automatically used when you call [`crate::compile::builder::FlowBuilder::deploy`] and pass in
44/// an `&mut` reference to [`hydro_deploy::Deployment`] as the deployment context.
45pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48    /// Map from Cluster location ID to member IDs.
49    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50    type InstantiateEnv = Deployment;
51
52    type Process = DeployNode;
53    type Cluster = DeployCluster;
54    type External = DeployExternal;
55
56    fn o2o_sink_source(
57        _env: &mut Self::InstantiateEnv,
58        _p1: &Self::Process,
59        p1_port: &<Self::Process as Node>::Port,
60        _p2: &Self::Process,
61        p2_port: &<Self::Process as Node>::Port,
62        _name: Option<&str>,
63        networking_info: &crate::networking::NetworkingInfo,
64    ) -> (syn::Expr, syn::Expr) {
65        match networking_info {
66            crate::networking::NetworkingInfo::Tcp {
67                fault: crate::networking::TcpFault::FailStop,
68            } => {}
69            _ => panic!("Unsupported networking info: {:?}", networking_info),
70        }
71        let p1_port = p1_port.as_str();
72        let p2_port = p2_port.as_str();
73        deploy_o2o(
74            RuntimeData::new("__hydro_lang_trybuild_cli"),
75            p1_port,
76            p2_port,
77        )
78    }
79
80    fn o2o_connect(
81        p1: &Self::Process,
82        p1_port: &<Self::Process as Node>::Port,
83        p2: &Self::Process,
84        p2_port: &<Self::Process as Node>::Port,
85    ) -> Box<dyn FnOnce()> {
86        let p1 = p1.clone();
87        let p1_port = p1_port.clone();
88        let p2 = p2.clone();
89        let p2_port = p2_port.clone();
90
91        Box::new(move || {
92            let self_underlying_borrow = p1.underlying.borrow();
93            let self_underlying = self_underlying_borrow.as_ref().unwrap();
94            let source_port = self_underlying.get_port(p1_port.clone());
95
96            let other_underlying_borrow = p2.underlying.borrow();
97            let other_underlying = other_underlying_borrow.as_ref().unwrap();
98            let recipient_port = other_underlying.get_port(p2_port.clone());
99
100            source_port.send_to(&recipient_port)
101        })
102    }
103
104    fn o2m_sink_source(
105        _env: &mut Self::InstantiateEnv,
106        _p1: &Self::Process,
107        p1_port: &<Self::Process as Node>::Port,
108        _c2: &Self::Cluster,
109        c2_port: &<Self::Cluster as Node>::Port,
110        _name: Option<&str>,
111        networking_info: &crate::networking::NetworkingInfo,
112    ) -> (syn::Expr, syn::Expr) {
113        match networking_info {
114            crate::networking::NetworkingInfo::Tcp {
115                fault: crate::networking::TcpFault::FailStop,
116            } => {}
117            _ => panic!("Unsupported networking info: {:?}", networking_info),
118        }
119        let p1_port = p1_port.as_str();
120        let c2_port = c2_port.as_str();
121        deploy_o2m(
122            RuntimeData::new("__hydro_lang_trybuild_cli"),
123            p1_port,
124            c2_port,
125        )
126    }
127
128    fn o2m_connect(
129        p1: &Self::Process,
130        p1_port: &<Self::Process as Node>::Port,
131        c2: &Self::Cluster,
132        c2_port: &<Self::Cluster as Node>::Port,
133    ) -> Box<dyn FnOnce()> {
134        let p1 = p1.clone();
135        let p1_port = p1_port.clone();
136        let c2 = c2.clone();
137        let c2_port = c2_port.clone();
138
139        Box::new(move || {
140            let self_underlying_borrow = p1.underlying.borrow();
141            let self_underlying = self_underlying_borrow.as_ref().unwrap();
142            let source_port = self_underlying.get_port(p1_port.clone());
143
144            let recipient_port = DemuxSink {
145                demux: c2
146                    .members
147                    .borrow()
148                    .iter()
149                    .enumerate()
150                    .map(|(id, c)| {
151                        (
152                            id as u32,
153                            Arc::new(c.underlying.get_port(c2_port.clone()))
154                                as Arc<dyn RustCrateSink + 'static>,
155                        )
156                    })
157                    .collect(),
158            };
159
160            source_port.send_to(&recipient_port)
161        })
162    }
163
164    fn m2o_sink_source(
165        _env: &mut Self::InstantiateEnv,
166        _c1: &Self::Cluster,
167        c1_port: &<Self::Cluster as Node>::Port,
168        _p2: &Self::Process,
169        p2_port: &<Self::Process as Node>::Port,
170        _name: Option<&str>,
171        networking_info: &crate::networking::NetworkingInfo,
172    ) -> (syn::Expr, syn::Expr) {
173        match networking_info {
174            crate::networking::NetworkingInfo::Tcp {
175                fault: crate::networking::TcpFault::FailStop,
176            } => {}
177            _ => panic!("Unsupported networking info: {:?}", networking_info),
178        }
179        let c1_port = c1_port.as_str();
180        let p2_port = p2_port.as_str();
181        deploy_m2o(
182            RuntimeData::new("__hydro_lang_trybuild_cli"),
183            c1_port,
184            p2_port,
185        )
186    }
187
188    fn m2o_connect(
189        c1: &Self::Cluster,
190        c1_port: &<Self::Cluster as Node>::Port,
191        p2: &Self::Process,
192        p2_port: &<Self::Process as Node>::Port,
193    ) -> Box<dyn FnOnce()> {
194        let c1 = c1.clone();
195        let c1_port = c1_port.clone();
196        let p2 = p2.clone();
197        let p2_port = p2_port.clone();
198
199        Box::new(move || {
200            let other_underlying_borrow = p2.underlying.borrow();
201            let other_underlying = other_underlying_borrow.as_ref().unwrap();
202            let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
203
204            for (i, node) in c1.members.borrow().iter().enumerate() {
205                let source_port = node.underlying.get_port(c1_port.clone());
206
207                TaggedSource {
208                    source: Arc::new(source_port),
209                    tag: i as u32,
210                }
211                .send_to(&recipient_port);
212            }
213        })
214    }
215
216    fn m2m_sink_source(
217        _env: &mut Self::InstantiateEnv,
218        _c1: &Self::Cluster,
219        c1_port: &<Self::Cluster as Node>::Port,
220        _c2: &Self::Cluster,
221        c2_port: &<Self::Cluster as Node>::Port,
222        _name: Option<&str>,
223        networking_info: &crate::networking::NetworkingInfo,
224    ) -> (syn::Expr, syn::Expr) {
225        match networking_info {
226            crate::networking::NetworkingInfo::Tcp {
227                fault: crate::networking::TcpFault::FailStop,
228            } => {}
229            _ => panic!("Unsupported networking info: {:?}", networking_info),
230        }
231        let c1_port = c1_port.as_str();
232        let c2_port = c2_port.as_str();
233        deploy_m2m(
234            RuntimeData::new("__hydro_lang_trybuild_cli"),
235            c1_port,
236            c2_port,
237        )
238    }
239
240    fn m2m_connect(
241        c1: &Self::Cluster,
242        c1_port: &<Self::Cluster as Node>::Port,
243        c2: &Self::Cluster,
244        c2_port: &<Self::Cluster as Node>::Port,
245    ) -> Box<dyn FnOnce()> {
246        let c1 = c1.clone();
247        let c1_port = c1_port.clone();
248        let c2 = c2.clone();
249        let c2_port = c2_port.clone();
250
251        Box::new(move || {
252            for (i, sender) in c1.members.borrow().iter().enumerate() {
253                let source_port = sender.underlying.get_port(c1_port.clone());
254
255                let recipient_port = DemuxSink {
256                    demux: c2
257                        .members
258                        .borrow()
259                        .iter()
260                        .enumerate()
261                        .map(|(id, c)| {
262                            (
263                                id as u32,
264                                Arc::new(c.underlying.get_port(c2_port.clone()).merge())
265                                    as Arc<dyn RustCrateSink + 'static>,
266                            )
267                        })
268                        .collect(),
269                };
270
271                TaggedSource {
272                    source: Arc::new(source_port),
273                    tag: i as u32,
274                }
275                .send_to(&recipient_port);
276            }
277        })
278    }
279
280    fn e2o_many_source(
281        extra_stmts: &mut Vec<syn::Stmt>,
282        _p2: &Self::Process,
283        p2_port: &<Self::Process as Node>::Port,
284        codec_type: &syn::Type,
285        shared_handle: String,
286    ) -> syn::Expr {
287        let connect_ident = syn::Ident::new(
288            &format!("__hydro_deploy_many_{}_connect", &shared_handle),
289            Span::call_site(),
290        );
291        let source_ident = syn::Ident::new(
292            &format!("__hydro_deploy_many_{}_source", &shared_handle),
293            Span::call_site(),
294        );
295        let sink_ident = syn::Ident::new(
296            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
297            Span::call_site(),
298        );
299        let membership_ident = syn::Ident::new(
300            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
301            Span::call_site(),
302        );
303
304        let root = get_this_crate();
305
306        extra_stmts.push(syn::parse_quote! {
307            let #connect_ident = __hydro_lang_trybuild_cli
308                .port(#p2_port)
309                .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
310        });
311
312        extra_stmts.push(syn::parse_quote! {
313            let #source_ident = #connect_ident.source;
314        });
315
316        extra_stmts.push(syn::parse_quote! {
317            let #sink_ident = #connect_ident.sink;
318        });
319
320        extra_stmts.push(syn::parse_quote! {
321            let #membership_ident = #connect_ident.membership;
322        });
323
324        parse_quote!(#source_ident)
325    }
326
327    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
328        let sink_ident = syn::Ident::new(
329            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
330            Span::call_site(),
331        );
332        parse_quote!(#sink_ident)
333    }
334
335    fn e2o_source(
336        extra_stmts: &mut Vec<syn::Stmt>,
337        _p1: &Self::External,
338        _p1_port: &<Self::External as Node>::Port,
339        _p2: &Self::Process,
340        p2_port: &<Self::Process as Node>::Port,
341        codec_type: &syn::Type,
342        shared_handle: String,
343    ) -> syn::Expr {
344        let connect_ident = syn::Ident::new(
345            &format!("__hydro_deploy_{}_connect", &shared_handle),
346            Span::call_site(),
347        );
348        let source_ident = syn::Ident::new(
349            &format!("__hydro_deploy_{}_source", &shared_handle),
350            Span::call_site(),
351        );
352        let sink_ident = syn::Ident::new(
353            &format!("__hydro_deploy_{}_sink", &shared_handle),
354            Span::call_site(),
355        );
356
357        let root = get_this_crate();
358
359        extra_stmts.push(syn::parse_quote! {
360            let #connect_ident = __hydro_lang_trybuild_cli
361                .port(#p2_port)
362                .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
363        });
364
365        extra_stmts.push(syn::parse_quote! {
366            let #source_ident = #connect_ident.source;
367        });
368
369        extra_stmts.push(syn::parse_quote! {
370            let #sink_ident = #connect_ident.sink;
371        });
372
373        parse_quote!(#source_ident)
374    }
375
376    fn e2o_connect(
377        p1: &Self::External,
378        p1_port: &<Self::External as Node>::Port,
379        p2: &Self::Process,
380        p2_port: &<Self::Process as Node>::Port,
381        _many: bool,
382        server_hint: NetworkHint,
383    ) -> Box<dyn FnOnce()> {
384        let p1 = p1.clone();
385        let p1_port = p1_port.clone();
386        let p2 = p2.clone();
387        let p2_port = p2_port.clone();
388
389        Box::new(move || {
390            let self_underlying_borrow = p1.underlying.borrow();
391            let self_underlying = self_underlying_borrow.as_ref().unwrap();
392            let source_port = self_underlying.declare_many_client();
393
394            let other_underlying_borrow = p2.underlying.borrow();
395            let other_underlying = other_underlying_borrow.as_ref().unwrap();
396            let recipient_port = other_underlying.get_port_with_hint(
397                p2_port.clone(),
398                match server_hint {
399                    NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
400                    NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
401                },
402            );
403
404            source_port.send_to(&recipient_port);
405
406            p1.client_ports
407                .borrow_mut()
408                .insert(p1_port.clone(), source_port);
409        })
410    }
411
412    fn o2e_sink(
413        _p1: &Self::Process,
414        _p1_port: &<Self::Process as Node>::Port,
415        _p2: &Self::External,
416        _p2_port: &<Self::External as Node>::Port,
417        shared_handle: String,
418    ) -> syn::Expr {
419        let sink_ident = syn::Ident::new(
420            &format!("__hydro_deploy_{}_sink", &shared_handle),
421            Span::call_site(),
422        );
423        parse_quote!(#sink_ident)
424    }
425
426    fn cluster_ids(
427        of_cluster: LocationKey,
428    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
429        cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
430    }
431
432    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
433        cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
434    }
435
436    fn cluster_membership_stream(
437        _env: &mut Self::InstantiateEnv,
438        _at_location: &LocationId,
439        location_id: &LocationId,
440    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
441    {
442        cluster_membership_stream(location_id)
443    }
444}
445
446#[expect(missing_docs, reason = "TODO")]
447pub trait DeployCrateWrapper {
448    fn underlying(&self) -> Arc<RustCrateService>;
449
450    fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
451        self.underlying().stdout()
452    }
453
454    fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
455        self.underlying().stderr()
456    }
457
458    fn stdout_filter(
459        &self,
460        prefix: impl Into<String>,
461    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
462        self.underlying().stdout_filter(prefix.into())
463    }
464
465    fn stderr_filter(
466        &self,
467        prefix: impl Into<String>,
468    ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
469        self.underlying().stderr_filter(prefix.into())
470    }
471}
472
473#[expect(missing_docs, reason = "TODO")]
474#[derive(Clone)]
475pub struct TrybuildHost {
476    host: Arc<dyn Host>,
477    display_name: Option<String>,
478    rustflags: Option<String>,
479    profile: Option<String>,
480    additional_hydro_features: Vec<String>,
481    features: Vec<String>,
482    tracing: Option<TracingOptions>,
483    build_envs: Vec<(String, String)>,
484    env: HashMap<String, String>,
485    pin_to_core: Option<usize>,
486    name_hint: Option<String>,
487    cluster_idx: Option<usize>,
488}
489
490impl From<Arc<dyn Host>> for TrybuildHost {
491    fn from(host: Arc<dyn Host>) -> Self {
492        Self {
493            host,
494            display_name: None,
495            rustflags: None,
496            profile: None,
497            additional_hydro_features: vec![],
498            features: vec![],
499            tracing: None,
500            build_envs: vec![],
501            env: HashMap::new(),
502            pin_to_core: None,
503            name_hint: None,
504            cluster_idx: None,
505        }
506    }
507}
508
509impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
510    fn from(host: Arc<H>) -> Self {
511        Self {
512            host,
513            display_name: None,
514            rustflags: None,
515            profile: None,
516            additional_hydro_features: vec![],
517            features: vec![],
518            tracing: None,
519            build_envs: vec![],
520            env: HashMap::new(),
521            pin_to_core: None,
522            name_hint: None,
523            cluster_idx: None,
524        }
525    }
526}
527
528#[expect(missing_docs, reason = "TODO")]
529impl TrybuildHost {
530    pub fn new(host: Arc<dyn Host>) -> Self {
531        Self {
532            host,
533            display_name: None,
534            rustflags: None,
535            profile: None,
536            additional_hydro_features: vec![],
537            features: vec![],
538            tracing: None,
539            build_envs: vec![],
540            env: HashMap::new(),
541            pin_to_core: None,
542            name_hint: None,
543            cluster_idx: None,
544        }
545    }
546
547    pub fn display_name(self, display_name: impl Into<String>) -> Self {
548        if self.display_name.is_some() {
549            panic!("{} already set", name_of!(display_name in Self));
550        }
551
552        Self {
553            display_name: Some(display_name.into()),
554            ..self
555        }
556    }
557
558    pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
559        if self.rustflags.is_some() {
560            panic!("{} already set", name_of!(rustflags in Self));
561        }
562
563        Self {
564            rustflags: Some(rustflags.into()),
565            ..self
566        }
567    }
568
569    pub fn profile(self, profile: impl Into<String>) -> Self {
570        if self.profile.is_some() {
571            panic!("{} already set", name_of!(profile in Self));
572        }
573
574        Self {
575            profile: Some(profile.into()),
576            ..self
577        }
578    }
579
580    pub fn additional_hydro_features(
581        mut self,
582        additional_hydro_features: impl IntoIterator<Item = impl Into<String>>,
583    ) -> Self {
584        self.additional_hydro_features
585            .extend(additional_hydro_features.into_iter().map(Into::into));
586        self
587    }
588
589    pub fn additional_hydro_feature(mut self, feature: impl Into<String>) -> Self {
590        self.additional_hydro_features.push(feature.into());
591        self
592    }
593
594    pub fn features(mut self, features: impl IntoIterator<Item = impl Into<String>>) -> Self {
595        self.features.extend(features.into_iter().map(Into::into));
596        self
597    }
598
599    pub fn feature(mut self, feature: impl Into<String>) -> Self {
600        self.features.push(feature.into());
601        self
602    }
603
604    pub fn tracing(self, tracing: TracingOptions) -> Self {
605        if self.tracing.is_some() {
606            panic!("{} already set", name_of!(tracing in Self));
607        }
608
609        Self {
610            tracing: Some(tracing),
611            ..self
612        }
613    }
614
615    pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
616        Self {
617            build_envs: self
618                .build_envs
619                .into_iter()
620                .chain(std::iter::once((key.into(), value.into())))
621                .collect(),
622            ..self
623        }
624    }
625
626    pub fn env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
627        let mut env = self.env;
628        env.insert(key.into(), value.into());
629        Self { env, ..self }
630    }
631
632    pub fn pin_to_core(self, core: usize) -> Self {
633        Self {
634            pin_to_core: Some(core),
635            ..self
636        }
637    }
638}
639
640impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
641    type ProcessSpec = TrybuildHost;
642    fn into_process_spec(self) -> TrybuildHost {
643        TrybuildHost {
644            host: self,
645            display_name: None,
646            rustflags: None,
647            profile: None,
648            additional_hydro_features: vec![],
649            features: vec![],
650            tracing: None,
651            build_envs: vec![],
652            env: HashMap::new(),
653            pin_to_core: None,
654            name_hint: None,
655            cluster_idx: None,
656        }
657    }
658}
659
660impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
661    type ProcessSpec = TrybuildHost;
662    fn into_process_spec(self) -> TrybuildHost {
663        TrybuildHost {
664            host: self,
665            display_name: None,
666            rustflags: None,
667            profile: None,
668            additional_hydro_features: vec![],
669            features: vec![],
670            tracing: None,
671            build_envs: vec![],
672            env: HashMap::new(),
673            pin_to_core: None,
674            name_hint: None,
675            cluster_idx: None,
676        }
677    }
678}
679
680#[expect(missing_docs, reason = "TODO")]
681#[derive(Clone)]
682pub struct DeployExternal {
683    next_port: Rc<RefCell<usize>>,
684    host: Arc<dyn Host>,
685    underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
686    client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
687    allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
688}
689
690impl DeployExternal {
691    pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
692        self.client_ports
693            .borrow()
694            .get(
695                self.allocated_ports
696                    .borrow()
697                    .get(&external_port_id)
698                    .unwrap(),
699            )
700            .unwrap()
701            .clone()
702    }
703}
704
705impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
706    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
707        assert!(
708            self.allocated_ports
709                .borrow_mut()
710                .insert(external_port_id, port.clone())
711                .is_none_or(|old| old == port)
712        );
713    }
714
715    fn as_bytes_bidi(
716        &self,
717        external_port_id: ExternalPortId,
718    ) -> impl Future<
719        Output = (
720            Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
721            Pin<Box<dyn Sink<Bytes, Error = Error>>>,
722        ),
723    > + 'a {
724        let port = self.raw_port(external_port_id);
725
726        async move {
727            let (source, sink) = port.connect().await.into_source_sink();
728            (
729                Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
730                Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
731            )
732        }
733    }
734
735    fn as_bincode_bidi<InT, OutT>(
736        &self,
737        external_port_id: ExternalPortId,
738    ) -> impl Future<
739        Output = (
740            Pin<Box<dyn Stream<Item = OutT>>>,
741            Pin<Box<dyn Sink<InT, Error = Error>>>,
742        ),
743    > + 'a
744    where
745        InT: Serialize + 'static,
746        OutT: DeserializeOwned + 'static,
747    {
748        let port = self.raw_port(external_port_id);
749        async move {
750            let (source, sink) = port.connect().await.into_source_sink();
751            (
752                Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
753                    as Pin<Box<dyn Stream<Item = OutT>>>,
754                Box::pin(
755                    sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
756                ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
757            )
758        }
759    }
760
761    fn as_bincode_sink<T: Serialize + 'static>(
762        &self,
763        external_port_id: ExternalPortId,
764    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
765        let port = self.raw_port(external_port_id);
766        async move {
767            let sink = port.connect().await.into_sink();
768            Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
769                as Pin<Box<dyn Sink<T, Error = Error>>>
770        }
771    }
772
773    fn as_bincode_source<T: DeserializeOwned + 'static>(
774        &self,
775        external_port_id: ExternalPortId,
776    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
777        let port = self.raw_port(external_port_id);
778        async move {
779            let source = port.connect().await.into_source();
780            Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
781                as Pin<Box<dyn Stream<Item = T>>>
782        }
783    }
784}
785
786impl Node for DeployExternal {
787    type Port = String;
788    /// Map from Cluster location ID to member IDs.
789    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
790    type InstantiateEnv = Deployment;
791
792    fn next_port(&self) -> Self::Port {
793        let next_port = *self.next_port.borrow();
794        *self.next_port.borrow_mut() += 1;
795
796        format!("port_{}", next_port)
797    }
798
799    fn instantiate(
800        &self,
801        env: &mut Self::InstantiateEnv,
802        _meta: &mut Self::Meta,
803        _graph: DfirGraph,
804        extra_stmts: &[syn::Stmt],
805        sidecars: &[syn::Expr],
806    ) {
807        assert!(extra_stmts.is_empty());
808        assert!(sidecars.is_empty());
809        let service = env.CustomService(self.host.clone(), vec![]);
810        *self.underlying.borrow_mut() = Some(service);
811    }
812
813    fn update_meta(&self, _meta: &Self::Meta) {}
814}
815
816impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
817    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
818        DeployExternal {
819            next_port: Rc::new(RefCell::new(0)),
820            host: self,
821            underlying: Rc::new(RefCell::new(None)),
822            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
823            client_ports: Rc::new(RefCell::new(HashMap::new())),
824        }
825    }
826}
827
828impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
829    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
830        DeployExternal {
831            next_port: Rc::new(RefCell::new(0)),
832            host: self,
833            underlying: Rc::new(RefCell::new(None)),
834            allocated_ports: Rc::new(RefCell::new(HashMap::new())),
835            client_ports: Rc::new(RefCell::new(HashMap::new())),
836        }
837    }
838}
839
840pub(crate) enum CrateOrTrybuild {
841    Crate(RustCrate, Arc<dyn Host>),
842    Trybuild(TrybuildHost),
843}
844
845#[expect(missing_docs, reason = "TODO")]
846#[derive(Clone)]
847pub struct DeployNode {
848    next_port: Rc<RefCell<usize>>,
849    service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
850    underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
851}
852
853impl DeployCrateWrapper for DeployNode {
854    fn underlying(&self) -> Arc<RustCrateService> {
855        Arc::clone(self.underlying.borrow().as_ref().unwrap())
856    }
857}
858
859impl Node for DeployNode {
860    type Port = String;
861    /// Map from Cluster location ID to member IDs.
862    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
863    type InstantiateEnv = Deployment;
864
865    fn next_port(&self) -> String {
866        let next_port = *self.next_port.borrow();
867        *self.next_port.borrow_mut() += 1;
868
869        format!("port_{}", next_port)
870    }
871
872    fn update_meta(&self, meta: &Self::Meta) {
873        let underlying_node = self.underlying.borrow();
874        underlying_node.as_ref().unwrap().update_meta(HydroMeta {
875            clusters: meta.clone(),
876            cluster_id: None,
877        });
878    }
879
880    fn instantiate(
881        &self,
882        env: &mut Self::InstantiateEnv,
883        _meta: &mut Self::Meta,
884        graph: DfirGraph,
885        extra_stmts: &[syn::Stmt],
886        sidecars: &[syn::Expr],
887    ) {
888        let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
889            CrateOrTrybuild::Crate(c, host) => (c, host),
890            CrateOrTrybuild::Trybuild(trybuild) => {
891                // Determine linking mode based on host target type
892                let linking_mode = if !cfg!(target_os = "windows")
893                    && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
894                    && trybuild.rustflags.is_none()
895                {
896                    // When compiling for local, prefer dynamic linking to reduce binary size
897                    // Windows is currently not supported due to https://github.com/bevyengine/bevy/pull/2016
898                    LinkingMode::Dynamic
899                } else {
900                    LinkingMode::Static
901                };
902                let (bin_name, config) = create_graph_trybuild(
903                    graph,
904                    extra_stmts,
905                    sidecars,
906                    trybuild.name_hint.as_deref(),
907                    crate::compile::trybuild::generate::DeployMode::HydroDeploy,
908                    linking_mode,
909                );
910                let host = trybuild.host.clone();
911                (
912                    create_trybuild_service(
913                        trybuild,
914                        &config.project_dir,
915                        &config.target_dir,
916                        config.features.as_deref(),
917                        &bin_name,
918                        &config.linking_mode,
919                    ),
920                    host,
921                )
922            }
923        };
924
925        *self.underlying.borrow_mut() = Some(env.add_service(service, host));
926    }
927}
928
929#[expect(missing_docs, reason = "TODO")]
930#[derive(Clone)]
931pub struct DeployClusterNode {
932    underlying: Arc<RustCrateService>,
933}
934
935impl DeployCrateWrapper for DeployClusterNode {
936    fn underlying(&self) -> Arc<RustCrateService> {
937        self.underlying.clone()
938    }
939}
940#[expect(missing_docs, reason = "TODO")]
941#[derive(Clone)]
942pub struct DeployCluster {
943    key: LocationKey,
944    next_port: Rc<RefCell<usize>>,
945    cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
946    members: Rc<RefCell<Vec<DeployClusterNode>>>,
947    name_hint: Option<String>,
948}
949
950impl DeployCluster {
951    #[expect(missing_docs, reason = "TODO")]
952    pub fn members(&self) -> Vec<DeployClusterNode> {
953        self.members.borrow().clone()
954    }
955}
956
957impl Node for DeployCluster {
958    type Port = String;
959    /// Map from Cluster location ID to member IDs.
960    type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
961    type InstantiateEnv = Deployment;
962
963    fn next_port(&self) -> String {
964        let next_port = *self.next_port.borrow();
965        *self.next_port.borrow_mut() += 1;
966
967        format!("port_{}", next_port)
968    }
969
970    fn instantiate(
971        &self,
972        env: &mut Self::InstantiateEnv,
973        meta: &mut Self::Meta,
974        graph: DfirGraph,
975        extra_stmts: &[syn::Stmt],
976        sidecars: &[syn::Expr],
977    ) {
978        let has_trybuild = self
979            .cluster_spec
980            .borrow()
981            .as_ref()
982            .unwrap()
983            .iter()
984            .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
985
986        // For clusters, use static linking if ANY host is non-local (conservative approach)
987        let linking_mode = if !cfg!(target_os = "windows")
988            && self
989                .cluster_spec
990                .borrow()
991                .as_ref()
992                .unwrap()
993                .iter()
994                .all(|spec| match spec {
995                    CrateOrTrybuild::Crate(_, _) => true, // crates handle their own linking
996                    CrateOrTrybuild::Trybuild(t) => {
997                        t.host.target_type() == hydro_deploy::HostTargetType::Local
998                            && t.rustflags.is_none()
999                    }
1000                }) {
1001            // See comment above for Windows exception
1002            LinkingMode::Dynamic
1003        } else {
1004            LinkingMode::Static
1005        };
1006
1007        let maybe_trybuild = if has_trybuild {
1008            Some(create_graph_trybuild(
1009                graph,
1010                extra_stmts,
1011                sidecars,
1012                self.name_hint.as_deref(),
1013                crate::compile::trybuild::generate::DeployMode::HydroDeploy,
1014                linking_mode,
1015            ))
1016        } else {
1017            None
1018        };
1019
1020        let cluster_nodes = self
1021            .cluster_spec
1022            .borrow_mut()
1023            .take()
1024            .unwrap()
1025            .into_iter()
1026            .map(|spec| {
1027                let (service, host) = match spec {
1028                    CrateOrTrybuild::Crate(c, host) => (c, host),
1029                    CrateOrTrybuild::Trybuild(trybuild) => {
1030                        let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
1031                        let host = trybuild.host.clone();
1032                        (
1033                            create_trybuild_service(
1034                                trybuild,
1035                                &config.project_dir,
1036                                &config.target_dir,
1037                                config.features.as_deref(),
1038                                bin_name,
1039                                &config.linking_mode,
1040                            ),
1041                            host,
1042                        )
1043                    }
1044                };
1045
1046                env.add_service(service, host)
1047            })
1048            .collect::<Vec<_>>();
1049        meta.insert(
1050            self.key,
1051            (0..(cluster_nodes.len() as u32))
1052                .map(TaglessMemberId::from_raw_id)
1053                .collect(),
1054        );
1055        *self.members.borrow_mut() = cluster_nodes
1056            .into_iter()
1057            .map(|n| DeployClusterNode { underlying: n })
1058            .collect();
1059    }
1060
1061    fn update_meta(&self, meta: &Self::Meta) {
1062        for (cluster_id, node) in self.members.borrow().iter().enumerate() {
1063            node.underlying.update_meta(HydroMeta {
1064                clusters: meta.clone(),
1065                cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
1066            });
1067        }
1068    }
1069}
1070
1071#[expect(missing_docs, reason = "TODO")]
1072#[derive(Clone)]
1073pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
1074
1075impl DeployProcessSpec {
1076    #[expect(missing_docs, reason = "TODO")]
1077    pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1078        Self(t, host)
1079    }
1080}
1081
1082impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1083    fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1084        DeployNode {
1085            next_port: Rc::new(RefCell::new(0)),
1086            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1087            underlying: Rc::new(RefCell::new(None)),
1088        }
1089    }
1090}
1091
1092impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1093    fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1094        self.name_hint = Some(format!("{} (process {})", name_hint, key));
1095        DeployNode {
1096            next_port: Rc::new(RefCell::new(0)),
1097            service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1098            underlying: Rc::new(RefCell::new(None)),
1099        }
1100    }
1101}
1102
1103#[expect(missing_docs, reason = "TODO")]
1104#[derive(Clone)]
1105pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1106
1107impl DeployClusterSpec {
1108    #[expect(missing_docs, reason = "TODO")]
1109    pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1110        Self(crates)
1111    }
1112}
1113
1114impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1115    fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1116        DeployCluster {
1117            key,
1118            next_port: Rc::new(RefCell::new(0)),
1119            cluster_spec: Rc::new(RefCell::new(Some(
1120                self.0
1121                    .into_iter()
1122                    .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1123                    .collect(),
1124            ))),
1125            members: Rc::new(RefCell::new(vec![])),
1126            name_hint: None,
1127        }
1128    }
1129}
1130
1131impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1132    fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1133        let name_hint = format!("{} (cluster {})", name_hint, key);
1134        DeployCluster {
1135            key,
1136            next_port: Rc::new(RefCell::new(0)),
1137            cluster_spec: Rc::new(RefCell::new(Some(
1138                self.into_iter()
1139                    .enumerate()
1140                    .map(|(idx, b)| {
1141                        let mut b = b.into();
1142                        b.name_hint = Some(name_hint.clone());
1143                        b.cluster_idx = Some(idx);
1144                        CrateOrTrybuild::Trybuild(b)
1145                    })
1146                    .collect(),
1147            ))),
1148            members: Rc::new(RefCell::new(vec![])),
1149            name_hint: Some(name_hint),
1150        }
1151    }
1152}
1153
1154fn create_trybuild_service(
1155    trybuild: TrybuildHost,
1156    dir: &std::path::Path,
1157    target_dir: &std::path::PathBuf,
1158    features: Option<&[String]>,
1159    bin_name: &str,
1160    linking_mode: &LinkingMode,
1161) -> RustCrate {
1162    // For dynamic linking, use the dylib-examples crate; for static, use the base crate
1163    let crate_dir = match linking_mode {
1164        LinkingMode::Dynamic => dir.join("dylib-examples"),
1165        LinkingMode::Static => dir.to_path_buf(),
1166    };
1167
1168    let mut ret = RustCrate::new(&crate_dir, dir)
1169        .target_dir(target_dir)
1170        .example(bin_name)
1171        .no_default_features();
1172
1173    ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1174
1175    if let Some(display_name) = trybuild.display_name {
1176        ret = ret.display_name(display_name);
1177    } else if let Some(name_hint) = trybuild.name_hint {
1178        if let Some(cluster_idx) = trybuild.cluster_idx {
1179            ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1180        } else {
1181            ret = ret.display_name(name_hint);
1182        }
1183    }
1184
1185    if let Some(rustflags) = trybuild.rustflags {
1186        ret = ret.rustflags(rustflags);
1187    }
1188
1189    if let Some(profile) = trybuild.profile {
1190        ret = ret.profile(profile);
1191    }
1192
1193    if let Some(tracing) = trybuild.tracing {
1194        ret = ret.tracing(tracing);
1195    }
1196
1197    if let Some(core) = trybuild.pin_to_core {
1198        ret = ret.pin_to_core(core);
1199    }
1200
1201    ret = ret.features(
1202        vec!["hydro___feature_deploy_integration".to_owned()]
1203            .into_iter()
1204            .chain(
1205                trybuild
1206                    .additional_hydro_features
1207                    .into_iter()
1208                    .map(|runtime_feature| {
1209                        assert!(
1210                            HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1211                            "{runtime_feature} is not a valid Hydro runtime feature"
1212                        );
1213                        format!("hydro___feature_{runtime_feature}")
1214                    }),
1215            )
1216            .chain(trybuild.features),
1217    );
1218
1219    for (key, value) in trybuild.build_envs {
1220        ret = ret.build_env(key, value);
1221    }
1222
1223    for (key, value) in trybuild.env {
1224        ret = ret.env(key, value);
1225    }
1226
1227    ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1228    ret = ret.config("build.incremental = false");
1229
1230    if let Some(features) = features {
1231        ret = ret.features(features);
1232    }
1233
1234    ret
1235}