1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use dfir_lang::graph::DfirGraph;
13use futures::{Sink, SinkExt, Stream, StreamExt};
14use hydro_deploy::custom_service::CustomClientPort;
15use hydro_deploy::rust_crate::RustCrateService;
16use hydro_deploy::rust_crate::ports::{DemuxSink, RustCrateSink, RustCrateSource, TaggedSource};
17use hydro_deploy::rust_crate::tracing_options::TracingOptions;
18use hydro_deploy::{CustomService, Deployment, Host, RustCrate};
19use hydro_deploy_integration::{ConnectedSink, ConnectedSource};
20use nameof::name_of;
21use proc_macro2::Span;
22use serde::Serialize;
23use serde::de::DeserializeOwned;
24use slotmap::SparseSecondaryMap;
25use stageleft::{QuotedWithContext, RuntimeData};
26use syn::parse_quote;
27
28use super::deploy_runtime::*;
29use crate::compile::builder::ExternalPortId;
30use crate::compile::deploy_provider::{
31 ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, Node, ProcessSpec, RegisterPort,
32};
33use crate::compile::trybuild::generate::{
34 HYDRO_RUNTIME_FEATURES, LinkingMode, create_graph_trybuild,
35};
36use crate::location::dynamic::LocationId;
37use crate::location::member_id::TaglessMemberId;
38use crate::location::{LocationKey, MembershipEvent, NetworkHint};
39use crate::staging_util::get_this_crate;
40
41pub enum HydroDeploy {}
46
47impl<'a> Deploy<'a> for HydroDeploy {
48 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
50 type InstantiateEnv = Deployment;
51
52 type Process = DeployNode;
53 type Cluster = DeployCluster;
54 type External = DeployExternal;
55
56 fn o2o_sink_source(
57 _env: &mut Self::InstantiateEnv,
58 _p1: &Self::Process,
59 p1_port: &<Self::Process as Node>::Port,
60 _p2: &Self::Process,
61 p2_port: &<Self::Process as Node>::Port,
62 _name: Option<&str>,
63 networking_info: &crate::networking::NetworkingInfo,
64 ) -> (syn::Expr, syn::Expr) {
65 match networking_info {
66 crate::networking::NetworkingInfo::Tcp {
67 fault: crate::networking::TcpFault::FailStop,
68 } => {}
69 _ => panic!("Unsupported networking info: {:?}", networking_info),
70 }
71 let p1_port = p1_port.as_str();
72 let p2_port = p2_port.as_str();
73 deploy_o2o(
74 RuntimeData::new("__hydro_lang_trybuild_cli"),
75 p1_port,
76 p2_port,
77 )
78 }
79
80 fn o2o_connect(
81 p1: &Self::Process,
82 p1_port: &<Self::Process as Node>::Port,
83 p2: &Self::Process,
84 p2_port: &<Self::Process as Node>::Port,
85 ) -> Box<dyn FnOnce()> {
86 let p1 = p1.clone();
87 let p1_port = p1_port.clone();
88 let p2 = p2.clone();
89 let p2_port = p2_port.clone();
90
91 Box::new(move || {
92 let self_underlying_borrow = p1.underlying.borrow();
93 let self_underlying = self_underlying_borrow.as_ref().unwrap();
94 let source_port = self_underlying.get_port(p1_port.clone());
95
96 let other_underlying_borrow = p2.underlying.borrow();
97 let other_underlying = other_underlying_borrow.as_ref().unwrap();
98 let recipient_port = other_underlying.get_port(p2_port.clone());
99
100 source_port.send_to(&recipient_port)
101 })
102 }
103
104 fn o2m_sink_source(
105 _env: &mut Self::InstantiateEnv,
106 _p1: &Self::Process,
107 p1_port: &<Self::Process as Node>::Port,
108 _c2: &Self::Cluster,
109 c2_port: &<Self::Cluster as Node>::Port,
110 _name: Option<&str>,
111 networking_info: &crate::networking::NetworkingInfo,
112 ) -> (syn::Expr, syn::Expr) {
113 match networking_info {
114 crate::networking::NetworkingInfo::Tcp {
115 fault: crate::networking::TcpFault::FailStop,
116 } => {}
117 _ => panic!("Unsupported networking info: {:?}", networking_info),
118 }
119 let p1_port = p1_port.as_str();
120 let c2_port = c2_port.as_str();
121 deploy_o2m(
122 RuntimeData::new("__hydro_lang_trybuild_cli"),
123 p1_port,
124 c2_port,
125 )
126 }
127
128 fn o2m_connect(
129 p1: &Self::Process,
130 p1_port: &<Self::Process as Node>::Port,
131 c2: &Self::Cluster,
132 c2_port: &<Self::Cluster as Node>::Port,
133 ) -> Box<dyn FnOnce()> {
134 let p1 = p1.clone();
135 let p1_port = p1_port.clone();
136 let c2 = c2.clone();
137 let c2_port = c2_port.clone();
138
139 Box::new(move || {
140 let self_underlying_borrow = p1.underlying.borrow();
141 let self_underlying = self_underlying_borrow.as_ref().unwrap();
142 let source_port = self_underlying.get_port(p1_port.clone());
143
144 let recipient_port = DemuxSink {
145 demux: c2
146 .members
147 .borrow()
148 .iter()
149 .enumerate()
150 .map(|(id, c)| {
151 (
152 id as u32,
153 Arc::new(c.underlying.get_port(c2_port.clone()))
154 as Arc<dyn RustCrateSink + 'static>,
155 )
156 })
157 .collect(),
158 };
159
160 source_port.send_to(&recipient_port)
161 })
162 }
163
164 fn m2o_sink_source(
165 _env: &mut Self::InstantiateEnv,
166 _c1: &Self::Cluster,
167 c1_port: &<Self::Cluster as Node>::Port,
168 _p2: &Self::Process,
169 p2_port: &<Self::Process as Node>::Port,
170 _name: Option<&str>,
171 networking_info: &crate::networking::NetworkingInfo,
172 ) -> (syn::Expr, syn::Expr) {
173 match networking_info {
174 crate::networking::NetworkingInfo::Tcp {
175 fault: crate::networking::TcpFault::FailStop,
176 } => {}
177 _ => panic!("Unsupported networking info: {:?}", networking_info),
178 }
179 let c1_port = c1_port.as_str();
180 let p2_port = p2_port.as_str();
181 deploy_m2o(
182 RuntimeData::new("__hydro_lang_trybuild_cli"),
183 c1_port,
184 p2_port,
185 )
186 }
187
188 fn m2o_connect(
189 c1: &Self::Cluster,
190 c1_port: &<Self::Cluster as Node>::Port,
191 p2: &Self::Process,
192 p2_port: &<Self::Process as Node>::Port,
193 ) -> Box<dyn FnOnce()> {
194 let c1 = c1.clone();
195 let c1_port = c1_port.clone();
196 let p2 = p2.clone();
197 let p2_port = p2_port.clone();
198
199 Box::new(move || {
200 let other_underlying_borrow = p2.underlying.borrow();
201 let other_underlying = other_underlying_borrow.as_ref().unwrap();
202 let recipient_port = other_underlying.get_port(p2_port.clone()).merge();
203
204 for (i, node) in c1.members.borrow().iter().enumerate() {
205 let source_port = node.underlying.get_port(c1_port.clone());
206
207 TaggedSource {
208 source: Arc::new(source_port),
209 tag: i as u32,
210 }
211 .send_to(&recipient_port);
212 }
213 })
214 }
215
216 fn m2m_sink_source(
217 _env: &mut Self::InstantiateEnv,
218 _c1: &Self::Cluster,
219 c1_port: &<Self::Cluster as Node>::Port,
220 _c2: &Self::Cluster,
221 c2_port: &<Self::Cluster as Node>::Port,
222 _name: Option<&str>,
223 networking_info: &crate::networking::NetworkingInfo,
224 ) -> (syn::Expr, syn::Expr) {
225 match networking_info {
226 crate::networking::NetworkingInfo::Tcp {
227 fault: crate::networking::TcpFault::FailStop,
228 } => {}
229 _ => panic!("Unsupported networking info: {:?}", networking_info),
230 }
231 let c1_port = c1_port.as_str();
232 let c2_port = c2_port.as_str();
233 deploy_m2m(
234 RuntimeData::new("__hydro_lang_trybuild_cli"),
235 c1_port,
236 c2_port,
237 )
238 }
239
240 fn m2m_connect(
241 c1: &Self::Cluster,
242 c1_port: &<Self::Cluster as Node>::Port,
243 c2: &Self::Cluster,
244 c2_port: &<Self::Cluster as Node>::Port,
245 ) -> Box<dyn FnOnce()> {
246 let c1 = c1.clone();
247 let c1_port = c1_port.clone();
248 let c2 = c2.clone();
249 let c2_port = c2_port.clone();
250
251 Box::new(move || {
252 for (i, sender) in c1.members.borrow().iter().enumerate() {
253 let source_port = sender.underlying.get_port(c1_port.clone());
254
255 let recipient_port = DemuxSink {
256 demux: c2
257 .members
258 .borrow()
259 .iter()
260 .enumerate()
261 .map(|(id, c)| {
262 (
263 id as u32,
264 Arc::new(c.underlying.get_port(c2_port.clone()).merge())
265 as Arc<dyn RustCrateSink + 'static>,
266 )
267 })
268 .collect(),
269 };
270
271 TaggedSource {
272 source: Arc::new(source_port),
273 tag: i as u32,
274 }
275 .send_to(&recipient_port);
276 }
277 })
278 }
279
280 fn e2o_many_source(
281 extra_stmts: &mut Vec<syn::Stmt>,
282 _p2: &Self::Process,
283 p2_port: &<Self::Process as Node>::Port,
284 codec_type: &syn::Type,
285 shared_handle: String,
286 ) -> syn::Expr {
287 let connect_ident = syn::Ident::new(
288 &format!("__hydro_deploy_many_{}_connect", &shared_handle),
289 Span::call_site(),
290 );
291 let source_ident = syn::Ident::new(
292 &format!("__hydro_deploy_many_{}_source", &shared_handle),
293 Span::call_site(),
294 );
295 let sink_ident = syn::Ident::new(
296 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
297 Span::call_site(),
298 );
299 let membership_ident = syn::Ident::new(
300 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
301 Span::call_site(),
302 );
303
304 let root = get_this_crate();
305
306 extra_stmts.push(syn::parse_quote! {
307 let #connect_ident = __hydro_lang_trybuild_cli
308 .port(#p2_port)
309 .connect::<#root::runtime_support::hydro_deploy_integration::multi_connection::ConnectedMultiConnection<_, _, #codec_type>>();
310 });
311
312 extra_stmts.push(syn::parse_quote! {
313 let #source_ident = #connect_ident.source;
314 });
315
316 extra_stmts.push(syn::parse_quote! {
317 let #sink_ident = #connect_ident.sink;
318 });
319
320 extra_stmts.push(syn::parse_quote! {
321 let #membership_ident = #connect_ident.membership;
322 });
323
324 parse_quote!(#source_ident)
325 }
326
327 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
328 let sink_ident = syn::Ident::new(
329 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
330 Span::call_site(),
331 );
332 parse_quote!(#sink_ident)
333 }
334
335 fn e2o_source(
336 extra_stmts: &mut Vec<syn::Stmt>,
337 _p1: &Self::External,
338 _p1_port: &<Self::External as Node>::Port,
339 _p2: &Self::Process,
340 p2_port: &<Self::Process as Node>::Port,
341 codec_type: &syn::Type,
342 shared_handle: String,
343 ) -> syn::Expr {
344 let connect_ident = syn::Ident::new(
345 &format!("__hydro_deploy_{}_connect", &shared_handle),
346 Span::call_site(),
347 );
348 let source_ident = syn::Ident::new(
349 &format!("__hydro_deploy_{}_source", &shared_handle),
350 Span::call_site(),
351 );
352 let sink_ident = syn::Ident::new(
353 &format!("__hydro_deploy_{}_sink", &shared_handle),
354 Span::call_site(),
355 );
356
357 let root = get_this_crate();
358
359 extra_stmts.push(syn::parse_quote! {
360 let #connect_ident = __hydro_lang_trybuild_cli
361 .port(#p2_port)
362 .connect::<#root::runtime_support::hydro_deploy_integration::single_connection::ConnectedSingleConnection<_, _, #codec_type>>();
363 });
364
365 extra_stmts.push(syn::parse_quote! {
366 let #source_ident = #connect_ident.source;
367 });
368
369 extra_stmts.push(syn::parse_quote! {
370 let #sink_ident = #connect_ident.sink;
371 });
372
373 parse_quote!(#source_ident)
374 }
375
376 fn e2o_connect(
377 p1: &Self::External,
378 p1_port: &<Self::External as Node>::Port,
379 p2: &Self::Process,
380 p2_port: &<Self::Process as Node>::Port,
381 _many: bool,
382 server_hint: NetworkHint,
383 ) -> Box<dyn FnOnce()> {
384 let p1 = p1.clone();
385 let p1_port = p1_port.clone();
386 let p2 = p2.clone();
387 let p2_port = p2_port.clone();
388
389 Box::new(move || {
390 let self_underlying_borrow = p1.underlying.borrow();
391 let self_underlying = self_underlying_borrow.as_ref().unwrap();
392 let source_port = self_underlying.declare_many_client();
393
394 let other_underlying_borrow = p2.underlying.borrow();
395 let other_underlying = other_underlying_borrow.as_ref().unwrap();
396 let recipient_port = other_underlying.get_port_with_hint(
397 p2_port.clone(),
398 match server_hint {
399 NetworkHint::Auto => hydro_deploy::PortNetworkHint::Auto,
400 NetworkHint::TcpPort(p) => hydro_deploy::PortNetworkHint::TcpPort(p),
401 },
402 );
403
404 source_port.send_to(&recipient_port);
405
406 p1.client_ports
407 .borrow_mut()
408 .insert(p1_port.clone(), source_port);
409 })
410 }
411
412 fn o2e_sink(
413 _p1: &Self::Process,
414 _p1_port: &<Self::Process as Node>::Port,
415 _p2: &Self::External,
416 _p2_port: &<Self::External as Node>::Port,
417 shared_handle: String,
418 ) -> syn::Expr {
419 let sink_ident = syn::Ident::new(
420 &format!("__hydro_deploy_{}_sink", &shared_handle),
421 Span::call_site(),
422 );
423 parse_quote!(#sink_ident)
424 }
425
426 fn cluster_ids(
427 of_cluster: LocationKey,
428 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
429 cluster_members(RuntimeData::new("__hydro_lang_trybuild_cli"), of_cluster)
430 }
431
432 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
433 cluster_self_id(RuntimeData::new("__hydro_lang_trybuild_cli"))
434 }
435
436 fn cluster_membership_stream(
437 _env: &mut Self::InstantiateEnv,
438 _at_location: &LocationId,
439 location_id: &LocationId,
440 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
441 {
442 cluster_membership_stream(location_id)
443 }
444}
445
446#[expect(missing_docs, reason = "TODO")]
447pub trait DeployCrateWrapper {
448 fn underlying(&self) -> Arc<RustCrateService>;
449
450 fn stdout(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
451 self.underlying().stdout()
452 }
453
454 fn stderr(&self) -> tokio::sync::mpsc::UnboundedReceiver<String> {
455 self.underlying().stderr()
456 }
457
458 fn stdout_filter(
459 &self,
460 prefix: impl Into<String>,
461 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
462 self.underlying().stdout_filter(prefix.into())
463 }
464
465 fn stderr_filter(
466 &self,
467 prefix: impl Into<String>,
468 ) -> tokio::sync::mpsc::UnboundedReceiver<String> {
469 self.underlying().stderr_filter(prefix.into())
470 }
471}
472
473#[expect(missing_docs, reason = "TODO")]
474#[derive(Clone)]
475pub struct TrybuildHost {
476 host: Arc<dyn Host>,
477 display_name: Option<String>,
478 rustflags: Option<String>,
479 profile: Option<String>,
480 additional_hydro_features: Vec<String>,
481 features: Vec<String>,
482 tracing: Option<TracingOptions>,
483 build_envs: Vec<(String, String)>,
484 env: HashMap<String, String>,
485 pin_to_core: Option<usize>,
486 name_hint: Option<String>,
487 cluster_idx: Option<usize>,
488}
489
490impl From<Arc<dyn Host>> for TrybuildHost {
491 fn from(host: Arc<dyn Host>) -> Self {
492 Self {
493 host,
494 display_name: None,
495 rustflags: None,
496 profile: None,
497 additional_hydro_features: vec![],
498 features: vec![],
499 tracing: None,
500 build_envs: vec![],
501 env: HashMap::new(),
502 pin_to_core: None,
503 name_hint: None,
504 cluster_idx: None,
505 }
506 }
507}
508
509impl<H: Host + 'static> From<Arc<H>> for TrybuildHost {
510 fn from(host: Arc<H>) -> Self {
511 Self {
512 host,
513 display_name: None,
514 rustflags: None,
515 profile: None,
516 additional_hydro_features: vec![],
517 features: vec![],
518 tracing: None,
519 build_envs: vec![],
520 env: HashMap::new(),
521 pin_to_core: None,
522 name_hint: None,
523 cluster_idx: None,
524 }
525 }
526}
527
528#[expect(missing_docs, reason = "TODO")]
529impl TrybuildHost {
530 pub fn new(host: Arc<dyn Host>) -> Self {
531 Self {
532 host,
533 display_name: None,
534 rustflags: None,
535 profile: None,
536 additional_hydro_features: vec![],
537 features: vec![],
538 tracing: None,
539 build_envs: vec![],
540 env: HashMap::new(),
541 pin_to_core: None,
542 name_hint: None,
543 cluster_idx: None,
544 }
545 }
546
547 pub fn display_name(self, display_name: impl Into<String>) -> Self {
548 if self.display_name.is_some() {
549 panic!("{} already set", name_of!(display_name in Self));
550 }
551
552 Self {
553 display_name: Some(display_name.into()),
554 ..self
555 }
556 }
557
558 pub fn rustflags(self, rustflags: impl Into<String>) -> Self {
559 if self.rustflags.is_some() {
560 panic!("{} already set", name_of!(rustflags in Self));
561 }
562
563 Self {
564 rustflags: Some(rustflags.into()),
565 ..self
566 }
567 }
568
569 pub fn profile(self, profile: impl Into<String>) -> Self {
570 if self.profile.is_some() {
571 panic!("{} already set", name_of!(profile in Self));
572 }
573
574 Self {
575 profile: Some(profile.into()),
576 ..self
577 }
578 }
579
580 pub fn additional_hydro_features(
581 mut self,
582 additional_hydro_features: impl IntoIterator<Item = impl Into<String>>,
583 ) -> Self {
584 self.additional_hydro_features
585 .extend(additional_hydro_features.into_iter().map(Into::into));
586 self
587 }
588
589 pub fn additional_hydro_feature(mut self, feature: impl Into<String>) -> Self {
590 self.additional_hydro_features.push(feature.into());
591 self
592 }
593
594 pub fn features(mut self, features: impl IntoIterator<Item = impl Into<String>>) -> Self {
595 self.features.extend(features.into_iter().map(Into::into));
596 self
597 }
598
599 pub fn feature(mut self, feature: impl Into<String>) -> Self {
600 self.features.push(feature.into());
601 self
602 }
603
604 pub fn tracing(self, tracing: TracingOptions) -> Self {
605 if self.tracing.is_some() {
606 panic!("{} already set", name_of!(tracing in Self));
607 }
608
609 Self {
610 tracing: Some(tracing),
611 ..self
612 }
613 }
614
615 pub fn build_env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
616 Self {
617 build_envs: self
618 .build_envs
619 .into_iter()
620 .chain(std::iter::once((key.into(), value.into())))
621 .collect(),
622 ..self
623 }
624 }
625
626 pub fn env(self, key: impl Into<String>, value: impl Into<String>) -> Self {
627 let mut env = self.env;
628 env.insert(key.into(), value.into());
629 Self { env, ..self }
630 }
631
632 pub fn pin_to_core(self, core: usize) -> Self {
633 Self {
634 pin_to_core: Some(core),
635 ..self
636 }
637 }
638}
639
640impl IntoProcessSpec<'_, HydroDeploy> for Arc<dyn Host> {
641 type ProcessSpec = TrybuildHost;
642 fn into_process_spec(self) -> TrybuildHost {
643 TrybuildHost {
644 host: self,
645 display_name: None,
646 rustflags: None,
647 profile: None,
648 additional_hydro_features: vec![],
649 features: vec![],
650 tracing: None,
651 build_envs: vec![],
652 env: HashMap::new(),
653 pin_to_core: None,
654 name_hint: None,
655 cluster_idx: None,
656 }
657 }
658}
659
660impl<H: Host + 'static> IntoProcessSpec<'_, HydroDeploy> for Arc<H> {
661 type ProcessSpec = TrybuildHost;
662 fn into_process_spec(self) -> TrybuildHost {
663 TrybuildHost {
664 host: self,
665 display_name: None,
666 rustflags: None,
667 profile: None,
668 additional_hydro_features: vec![],
669 features: vec![],
670 tracing: None,
671 build_envs: vec![],
672 env: HashMap::new(),
673 pin_to_core: None,
674 name_hint: None,
675 cluster_idx: None,
676 }
677 }
678}
679
680#[expect(missing_docs, reason = "TODO")]
681#[derive(Clone)]
682pub struct DeployExternal {
683 next_port: Rc<RefCell<usize>>,
684 host: Arc<dyn Host>,
685 underlying: Rc<RefCell<Option<Arc<CustomService>>>>,
686 client_ports: Rc<RefCell<HashMap<String, CustomClientPort>>>,
687 allocated_ports: Rc<RefCell<HashMap<ExternalPortId, String>>>,
688}
689
690impl DeployExternal {
691 pub(crate) fn raw_port(&self, external_port_id: ExternalPortId) -> CustomClientPort {
692 self.client_ports
693 .borrow()
694 .get(
695 self.allocated_ports
696 .borrow()
697 .get(&external_port_id)
698 .unwrap(),
699 )
700 .unwrap()
701 .clone()
702 }
703}
704
705impl<'a> RegisterPort<'a, HydroDeploy> for DeployExternal {
706 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
707 assert!(
708 self.allocated_ports
709 .borrow_mut()
710 .insert(external_port_id, port.clone())
711 .is_none_or(|old| old == port)
712 );
713 }
714
715 fn as_bytes_bidi(
716 &self,
717 external_port_id: ExternalPortId,
718 ) -> impl Future<
719 Output = (
720 Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
721 Pin<Box<dyn Sink<Bytes, Error = Error>>>,
722 ),
723 > + 'a {
724 let port = self.raw_port(external_port_id);
725
726 async move {
727 let (source, sink) = port.connect().await.into_source_sink();
728 (
729 Box::pin(source) as Pin<Box<dyn Stream<Item = Result<BytesMut, Error>>>>,
730 Box::pin(sink) as Pin<Box<dyn Sink<Bytes, Error = Error>>>,
731 )
732 }
733 }
734
735 fn as_bincode_bidi<InT, OutT>(
736 &self,
737 external_port_id: ExternalPortId,
738 ) -> impl Future<
739 Output = (
740 Pin<Box<dyn Stream<Item = OutT>>>,
741 Pin<Box<dyn Sink<InT, Error = Error>>>,
742 ),
743 > + 'a
744 where
745 InT: Serialize + 'static,
746 OutT: DeserializeOwned + 'static,
747 {
748 let port = self.raw_port(external_port_id);
749 async move {
750 let (source, sink) = port.connect().await.into_source_sink();
751 (
752 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
753 as Pin<Box<dyn Stream<Item = OutT>>>,
754 Box::pin(
755 sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }),
756 ) as Pin<Box<dyn Sink<InT, Error = Error>>>,
757 )
758 }
759 }
760
761 fn as_bincode_sink<T: Serialize + 'static>(
762 &self,
763 external_port_id: ExternalPortId,
764 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a {
765 let port = self.raw_port(external_port_id);
766 async move {
767 let sink = port.connect().await.into_sink();
768 Box::pin(sink.with(|item| async move { Ok(bincode::serialize(&item).unwrap().into()) }))
769 as Pin<Box<dyn Sink<T, Error = Error>>>
770 }
771 }
772
773 fn as_bincode_source<T: DeserializeOwned + 'static>(
774 &self,
775 external_port_id: ExternalPortId,
776 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a {
777 let port = self.raw_port(external_port_id);
778 async move {
779 let source = port.connect().await.into_source();
780 Box::pin(source.map(|item| bincode::deserialize(&item.unwrap()).unwrap()))
781 as Pin<Box<dyn Stream<Item = T>>>
782 }
783 }
784}
785
786impl Node for DeployExternal {
787 type Port = String;
788 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
790 type InstantiateEnv = Deployment;
791
792 fn next_port(&self) -> Self::Port {
793 let next_port = *self.next_port.borrow();
794 *self.next_port.borrow_mut() += 1;
795
796 format!("port_{}", next_port)
797 }
798
799 fn instantiate(
800 &self,
801 env: &mut Self::InstantiateEnv,
802 _meta: &mut Self::Meta,
803 _graph: DfirGraph,
804 extra_stmts: &[syn::Stmt],
805 sidecars: &[syn::Expr],
806 ) {
807 assert!(extra_stmts.is_empty());
808 assert!(sidecars.is_empty());
809 let service = env.CustomService(self.host.clone(), vec![]);
810 *self.underlying.borrow_mut() = Some(service);
811 }
812
813 fn update_meta(&self, _meta: &Self::Meta) {}
814}
815
816impl ExternalSpec<'_, HydroDeploy> for Arc<dyn Host> {
817 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
818 DeployExternal {
819 next_port: Rc::new(RefCell::new(0)),
820 host: self,
821 underlying: Rc::new(RefCell::new(None)),
822 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
823 client_ports: Rc::new(RefCell::new(HashMap::new())),
824 }
825 }
826}
827
828impl<H: Host + 'static> ExternalSpec<'_, HydroDeploy> for Arc<H> {
829 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployExternal {
830 DeployExternal {
831 next_port: Rc::new(RefCell::new(0)),
832 host: self,
833 underlying: Rc::new(RefCell::new(None)),
834 allocated_ports: Rc::new(RefCell::new(HashMap::new())),
835 client_ports: Rc::new(RefCell::new(HashMap::new())),
836 }
837 }
838}
839
840pub(crate) enum CrateOrTrybuild {
841 Crate(RustCrate, Arc<dyn Host>),
842 Trybuild(TrybuildHost),
843}
844
845#[expect(missing_docs, reason = "TODO")]
846#[derive(Clone)]
847pub struct DeployNode {
848 next_port: Rc<RefCell<usize>>,
849 service_spec: Rc<RefCell<Option<CrateOrTrybuild>>>,
850 underlying: Rc<RefCell<Option<Arc<RustCrateService>>>>,
851}
852
853impl DeployCrateWrapper for DeployNode {
854 fn underlying(&self) -> Arc<RustCrateService> {
855 Arc::clone(self.underlying.borrow().as_ref().unwrap())
856 }
857}
858
859impl Node for DeployNode {
860 type Port = String;
861 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
863 type InstantiateEnv = Deployment;
864
865 fn next_port(&self) -> String {
866 let next_port = *self.next_port.borrow();
867 *self.next_port.borrow_mut() += 1;
868
869 format!("port_{}", next_port)
870 }
871
872 fn update_meta(&self, meta: &Self::Meta) {
873 let underlying_node = self.underlying.borrow();
874 underlying_node.as_ref().unwrap().update_meta(HydroMeta {
875 clusters: meta.clone(),
876 cluster_id: None,
877 });
878 }
879
880 fn instantiate(
881 &self,
882 env: &mut Self::InstantiateEnv,
883 _meta: &mut Self::Meta,
884 graph: DfirGraph,
885 extra_stmts: &[syn::Stmt],
886 sidecars: &[syn::Expr],
887 ) {
888 let (service, host) = match self.service_spec.borrow_mut().take().unwrap() {
889 CrateOrTrybuild::Crate(c, host) => (c, host),
890 CrateOrTrybuild::Trybuild(trybuild) => {
891 let linking_mode = if !cfg!(target_os = "windows")
893 && trybuild.host.target_type() == hydro_deploy::HostTargetType::Local
894 && trybuild.rustflags.is_none()
895 {
896 LinkingMode::Dynamic
899 } else {
900 LinkingMode::Static
901 };
902 let (bin_name, config) = create_graph_trybuild(
903 graph,
904 extra_stmts,
905 sidecars,
906 trybuild.name_hint.as_deref(),
907 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
908 linking_mode,
909 );
910 let host = trybuild.host.clone();
911 (
912 create_trybuild_service(
913 trybuild,
914 &config.project_dir,
915 &config.target_dir,
916 config.features.as_deref(),
917 &bin_name,
918 &config.linking_mode,
919 ),
920 host,
921 )
922 }
923 };
924
925 *self.underlying.borrow_mut() = Some(env.add_service(service, host));
926 }
927}
928
929#[expect(missing_docs, reason = "TODO")]
930#[derive(Clone)]
931pub struct DeployClusterNode {
932 underlying: Arc<RustCrateService>,
933}
934
935impl DeployCrateWrapper for DeployClusterNode {
936 fn underlying(&self) -> Arc<RustCrateService> {
937 self.underlying.clone()
938 }
939}
940#[expect(missing_docs, reason = "TODO")]
941#[derive(Clone)]
942pub struct DeployCluster {
943 key: LocationKey,
944 next_port: Rc<RefCell<usize>>,
945 cluster_spec: Rc<RefCell<Option<Vec<CrateOrTrybuild>>>>,
946 members: Rc<RefCell<Vec<DeployClusterNode>>>,
947 name_hint: Option<String>,
948}
949
950impl DeployCluster {
951 #[expect(missing_docs, reason = "TODO")]
952 pub fn members(&self) -> Vec<DeployClusterNode> {
953 self.members.borrow().clone()
954 }
955}
956
957impl Node for DeployCluster {
958 type Port = String;
959 type Meta = SparseSecondaryMap<LocationKey, Vec<TaglessMemberId>>;
961 type InstantiateEnv = Deployment;
962
963 fn next_port(&self) -> String {
964 let next_port = *self.next_port.borrow();
965 *self.next_port.borrow_mut() += 1;
966
967 format!("port_{}", next_port)
968 }
969
970 fn instantiate(
971 &self,
972 env: &mut Self::InstantiateEnv,
973 meta: &mut Self::Meta,
974 graph: DfirGraph,
975 extra_stmts: &[syn::Stmt],
976 sidecars: &[syn::Expr],
977 ) {
978 let has_trybuild = self
979 .cluster_spec
980 .borrow()
981 .as_ref()
982 .unwrap()
983 .iter()
984 .any(|spec| matches!(spec, CrateOrTrybuild::Trybuild { .. }));
985
986 let linking_mode = if !cfg!(target_os = "windows")
988 && self
989 .cluster_spec
990 .borrow()
991 .as_ref()
992 .unwrap()
993 .iter()
994 .all(|spec| match spec {
995 CrateOrTrybuild::Crate(_, _) => true, CrateOrTrybuild::Trybuild(t) => {
997 t.host.target_type() == hydro_deploy::HostTargetType::Local
998 && t.rustflags.is_none()
999 }
1000 }) {
1001 LinkingMode::Dynamic
1003 } else {
1004 LinkingMode::Static
1005 };
1006
1007 let maybe_trybuild = if has_trybuild {
1008 Some(create_graph_trybuild(
1009 graph,
1010 extra_stmts,
1011 sidecars,
1012 self.name_hint.as_deref(),
1013 crate::compile::trybuild::generate::DeployMode::HydroDeploy,
1014 linking_mode,
1015 ))
1016 } else {
1017 None
1018 };
1019
1020 let cluster_nodes = self
1021 .cluster_spec
1022 .borrow_mut()
1023 .take()
1024 .unwrap()
1025 .into_iter()
1026 .map(|spec| {
1027 let (service, host) = match spec {
1028 CrateOrTrybuild::Crate(c, host) => (c, host),
1029 CrateOrTrybuild::Trybuild(trybuild) => {
1030 let (bin_name, config) = maybe_trybuild.as_ref().unwrap();
1031 let host = trybuild.host.clone();
1032 (
1033 create_trybuild_service(
1034 trybuild,
1035 &config.project_dir,
1036 &config.target_dir,
1037 config.features.as_deref(),
1038 bin_name,
1039 &config.linking_mode,
1040 ),
1041 host,
1042 )
1043 }
1044 };
1045
1046 env.add_service(service, host)
1047 })
1048 .collect::<Vec<_>>();
1049 meta.insert(
1050 self.key,
1051 (0..(cluster_nodes.len() as u32))
1052 .map(TaglessMemberId::from_raw_id)
1053 .collect(),
1054 );
1055 *self.members.borrow_mut() = cluster_nodes
1056 .into_iter()
1057 .map(|n| DeployClusterNode { underlying: n })
1058 .collect();
1059 }
1060
1061 fn update_meta(&self, meta: &Self::Meta) {
1062 for (cluster_id, node) in self.members.borrow().iter().enumerate() {
1063 node.underlying.update_meta(HydroMeta {
1064 clusters: meta.clone(),
1065 cluster_id: Some(TaglessMemberId::from_raw_id(cluster_id as u32)),
1066 });
1067 }
1068 }
1069}
1070
1071#[expect(missing_docs, reason = "TODO")]
1072#[derive(Clone)]
1073pub struct DeployProcessSpec(RustCrate, Arc<dyn Host>);
1074
1075impl DeployProcessSpec {
1076 #[expect(missing_docs, reason = "TODO")]
1077 pub fn new(t: RustCrate, host: Arc<dyn Host>) -> Self {
1078 Self(t, host)
1079 }
1080}
1081
1082impl ProcessSpec<'_, HydroDeploy> for DeployProcessSpec {
1083 fn build(self, _key: LocationKey, _name_hint: &str) -> DeployNode {
1084 DeployNode {
1085 next_port: Rc::new(RefCell::new(0)),
1086 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Crate(self.0, self.1)))),
1087 underlying: Rc::new(RefCell::new(None)),
1088 }
1089 }
1090}
1091
1092impl ProcessSpec<'_, HydroDeploy> for TrybuildHost {
1093 fn build(mut self, key: LocationKey, name_hint: &str) -> DeployNode {
1094 self.name_hint = Some(format!("{} (process {})", name_hint, key));
1095 DeployNode {
1096 next_port: Rc::new(RefCell::new(0)),
1097 service_spec: Rc::new(RefCell::new(Some(CrateOrTrybuild::Trybuild(self)))),
1098 underlying: Rc::new(RefCell::new(None)),
1099 }
1100 }
1101}
1102
1103#[expect(missing_docs, reason = "TODO")]
1104#[derive(Clone)]
1105pub struct DeployClusterSpec(Vec<(RustCrate, Arc<dyn Host>)>);
1106
1107impl DeployClusterSpec {
1108 #[expect(missing_docs, reason = "TODO")]
1109 pub fn new(crates: Vec<(RustCrate, Arc<dyn Host>)>) -> Self {
1110 Self(crates)
1111 }
1112}
1113
1114impl ClusterSpec<'_, HydroDeploy> for DeployClusterSpec {
1115 fn build(self, key: LocationKey, _name_hint: &str) -> DeployCluster {
1116 DeployCluster {
1117 key,
1118 next_port: Rc::new(RefCell::new(0)),
1119 cluster_spec: Rc::new(RefCell::new(Some(
1120 self.0
1121 .into_iter()
1122 .map(|(c, h)| CrateOrTrybuild::Crate(c, h))
1123 .collect(),
1124 ))),
1125 members: Rc::new(RefCell::new(vec![])),
1126 name_hint: None,
1127 }
1128 }
1129}
1130
1131impl<T: Into<TrybuildHost>, I: IntoIterator<Item = T>> ClusterSpec<'_, HydroDeploy> for I {
1132 fn build(self, key: LocationKey, name_hint: &str) -> DeployCluster {
1133 let name_hint = format!("{} (cluster {})", name_hint, key);
1134 DeployCluster {
1135 key,
1136 next_port: Rc::new(RefCell::new(0)),
1137 cluster_spec: Rc::new(RefCell::new(Some(
1138 self.into_iter()
1139 .enumerate()
1140 .map(|(idx, b)| {
1141 let mut b = b.into();
1142 b.name_hint = Some(name_hint.clone());
1143 b.cluster_idx = Some(idx);
1144 CrateOrTrybuild::Trybuild(b)
1145 })
1146 .collect(),
1147 ))),
1148 members: Rc::new(RefCell::new(vec![])),
1149 name_hint: Some(name_hint),
1150 }
1151 }
1152}
1153
1154fn create_trybuild_service(
1155 trybuild: TrybuildHost,
1156 dir: &std::path::Path,
1157 target_dir: &std::path::PathBuf,
1158 features: Option<&[String]>,
1159 bin_name: &str,
1160 linking_mode: &LinkingMode,
1161) -> RustCrate {
1162 let crate_dir = match linking_mode {
1164 LinkingMode::Dynamic => dir.join("dylib-examples"),
1165 LinkingMode::Static => dir.to_path_buf(),
1166 };
1167
1168 let mut ret = RustCrate::new(&crate_dir, dir)
1169 .target_dir(target_dir)
1170 .example(bin_name)
1171 .no_default_features();
1172
1173 ret = ret.set_is_dylib(matches!(linking_mode, LinkingMode::Dynamic));
1174
1175 if let Some(display_name) = trybuild.display_name {
1176 ret = ret.display_name(display_name);
1177 } else if let Some(name_hint) = trybuild.name_hint {
1178 if let Some(cluster_idx) = trybuild.cluster_idx {
1179 ret = ret.display_name(format!("{} / {}", name_hint, cluster_idx));
1180 } else {
1181 ret = ret.display_name(name_hint);
1182 }
1183 }
1184
1185 if let Some(rustflags) = trybuild.rustflags {
1186 ret = ret.rustflags(rustflags);
1187 }
1188
1189 if let Some(profile) = trybuild.profile {
1190 ret = ret.profile(profile);
1191 }
1192
1193 if let Some(tracing) = trybuild.tracing {
1194 ret = ret.tracing(tracing);
1195 }
1196
1197 if let Some(core) = trybuild.pin_to_core {
1198 ret = ret.pin_to_core(core);
1199 }
1200
1201 ret = ret.features(
1202 vec!["hydro___feature_deploy_integration".to_owned()]
1203 .into_iter()
1204 .chain(
1205 trybuild
1206 .additional_hydro_features
1207 .into_iter()
1208 .map(|runtime_feature| {
1209 assert!(
1210 HYDRO_RUNTIME_FEATURES.iter().any(|f| f == &runtime_feature),
1211 "{runtime_feature} is not a valid Hydro runtime feature"
1212 );
1213 format!("hydro___feature_{runtime_feature}")
1214 }),
1215 )
1216 .chain(trybuild.features),
1217 );
1218
1219 for (key, value) in trybuild.build_envs {
1220 ret = ret.build_env(key, value);
1221 }
1222
1223 for (key, value) in trybuild.env {
1224 ret = ret.env(key, value);
1225 }
1226
1227 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
1228 ret = ret.config("build.incremental = false");
1229
1230 if let Some(features) = features {
1231 ret = ret.features(features);
1232 }
1233
1234 ret
1235}