Skip to main content

hydro_lang/compile/
deploy_provider.rs

1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17    type Meta: Default;
18    type InstantiateEnv;
19
20    type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21    type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22    type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23        + RegisterPort<'a, Self>;
24
25    /// Generates the source and sink expressions when connecting a [`Self::Process`] to another
26    /// [`Self::Process`].
27    ///
28    /// The [`Self::InstantiateEnv`] can be used to record metadata about the created channel. The
29    /// provided `name` is the user-configured channel name from the network IR node.
30    fn o2o_sink_source(
31        env: &mut Self::InstantiateEnv,
32        p1: &Self::Process,
33        p1_port: &<Self::Process as Node>::Port,
34        p2: &Self::Process,
35        p2_port: &<Self::Process as Node>::Port,
36        name: Option<&str>,
37        networking_info: &crate::networking::NetworkingInfo,
38    ) -> (syn::Expr, syn::Expr);
39
40    /// Performs any runtime wiring needed after code generation for a
41    /// [`Self::Process`]-to-[`Self::Process`] channel.
42    ///
43    /// The returned closure is executed once all locations have been instantiated.
44    fn o2o_connect(
45        p1: &Self::Process,
46        p1_port: &<Self::Process as Node>::Port,
47        p2: &Self::Process,
48        p2_port: &<Self::Process as Node>::Port,
49    ) -> Box<dyn FnOnce()>;
50
51    /// Generates the source and sink expressions when connecting a [`Self::Process`] to a
52    /// [`Self::Cluster`] (one-to-many).
53    ///
54    /// The sink expression is used on the sending process and the source expression on each
55    /// receiving cluster member. The [`Self::InstantiateEnv`] can be used to record metadata
56    /// about the created channel. The provided `name` is the user-configured channel name
57    /// from the network IR node.
58    fn o2m_sink_source(
59        env: &mut Self::InstantiateEnv,
60        p1: &Self::Process,
61        p1_port: &<Self::Process as Node>::Port,
62        c2: &Self::Cluster,
63        c2_port: &<Self::Cluster as Node>::Port,
64        name: Option<&str>,
65        networking_info: &crate::networking::NetworkingInfo,
66    ) -> (syn::Expr, syn::Expr);
67
68    /// Performs any runtime wiring needed after code generation for a
69    /// [`Self::Process`]-to-[`Self::Cluster`] channel.
70    ///
71    /// The returned closure is executed once all locations have been instantiated.
72    fn o2m_connect(
73        p1: &Self::Process,
74        p1_port: &<Self::Process as Node>::Port,
75        c2: &Self::Cluster,
76        c2_port: &<Self::Cluster as Node>::Port,
77    ) -> Box<dyn FnOnce()>;
78
79    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to a
80    /// [`Self::Process`] (many-to-one).
81    ///
82    /// The sink expression is used on each sending cluster member and the source expression
83    /// on the receiving process. The [`Self::InstantiateEnv`] can be used to record metadata
84    /// about the created channel. The provided `name` is the user-configured channel name
85    /// from the network IR node.
86    fn m2o_sink_source(
87        env: &mut Self::InstantiateEnv,
88        c1: &Self::Cluster,
89        c1_port: &<Self::Cluster as Node>::Port,
90        p2: &Self::Process,
91        p2_port: &<Self::Process as Node>::Port,
92        name: Option<&str>,
93        networking_info: &crate::networking::NetworkingInfo,
94    ) -> (syn::Expr, syn::Expr);
95
96    /// Performs any runtime wiring needed after code generation for a
97    /// [`Self::Cluster`]-to-[`Self::Process`] channel.
98    ///
99    /// The returned closure is executed once all locations have been instantiated.
100    fn m2o_connect(
101        c1: &Self::Cluster,
102        c1_port: &<Self::Cluster as Node>::Port,
103        p2: &Self::Process,
104        p2_port: &<Self::Process as Node>::Port,
105    ) -> Box<dyn FnOnce()>;
106
107    /// Generates the source and sink expressions when connecting a [`Self::Cluster`] to another
108    /// [`Self::Cluster`] (many-to-many).
109    ///
110    /// The sink expression is used on each sending cluster member and the source expression
111    /// on each receiving cluster member. The [`Self::InstantiateEnv`] can be used to record
112    /// metadata about the created channel. The provided `name` is the user-configured channel
113    /// name from the network IR node.
114    fn m2m_sink_source(
115        env: &mut Self::InstantiateEnv,
116        c1: &Self::Cluster,
117        c1_port: &<Self::Cluster as Node>::Port,
118        c2: &Self::Cluster,
119        c2_port: &<Self::Cluster as Node>::Port,
120        name: Option<&str>,
121        networking_info: &crate::networking::NetworkingInfo,
122    ) -> (syn::Expr, syn::Expr);
123
124    /// Performs any runtime wiring needed after code generation for a
125    /// [`Self::Cluster`]-to-[`Self::Cluster`] channel.
126    ///
127    /// The returned closure is executed once all locations have been instantiated.
128    fn m2m_connect(
129        c1: &Self::Cluster,
130        c1_port: &<Self::Cluster as Node>::Port,
131        c2: &Self::Cluster,
132        c2_port: &<Self::Cluster as Node>::Port,
133    ) -> Box<dyn FnOnce()>;
134
135    fn e2o_many_source(
136        extra_stmts: &mut Vec<syn::Stmt>,
137        p2: &Self::Process,
138        p2_port: &<Self::Process as Node>::Port,
139        codec_type: &syn::Type,
140        shared_handle: String,
141    ) -> syn::Expr;
142    fn e2o_many_sink(shared_handle: String) -> syn::Expr;
143
144    fn e2o_source(
145        extra_stmts: &mut Vec<syn::Stmt>,
146        p1: &Self::External,
147        p1_port: &<Self::External as Node>::Port,
148        p2: &Self::Process,
149        p2_port: &<Self::Process as Node>::Port,
150        codec_type: &syn::Type,
151        shared_handle: String,
152    ) -> syn::Expr;
153    fn e2o_connect(
154        p1: &Self::External,
155        p1_port: &<Self::External as Node>::Port,
156        p2: &Self::Process,
157        p2_port: &<Self::Process as Node>::Port,
158        many: bool,
159        server_hint: NetworkHint,
160    ) -> Box<dyn FnOnce()>;
161
162    fn o2e_sink(
163        p1: &Self::Process,
164        p1_port: &<Self::Process as Node>::Port,
165        p2: &Self::External,
166        p2_port: &<Self::External as Node>::Port,
167        shared_handle: String,
168    ) -> syn::Expr;
169
170    fn e2m_source(
171        extra_stmts: &mut Vec<syn::Stmt>,
172        p1: &Self::External,
173        p1_port: &<Self::External as Node>::Port,
174        c2: &Self::Cluster,
175        c2_port: &<Self::Cluster as Node>::Port,
176        codec_type: &syn::Type,
177        shared_handle: String,
178    ) -> syn::Expr {
179        let _ = (
180            extra_stmts,
181            p1,
182            p1_port,
183            c2,
184            c2_port,
185            codec_type,
186            shared_handle,
187        );
188        todo!("e2m_source is not yet supported for this deploy backend")
189    }
190
191    fn e2m_connect(
192        p1: &Self::External,
193        p1_port: &<Self::External as Node>::Port,
194        c2: &Self::Cluster,
195        c2_port: &<Self::Cluster as Node>::Port,
196        server_hint: NetworkHint,
197    ) -> Box<dyn FnOnce()> {
198        let _ = (p1, p1_port, c2, c2_port, server_hint);
199        todo!("e2m_connect is not yet supported for this deploy backend")
200    }
201
202    fn m2e_sink(
203        c1: &Self::Cluster,
204        c1_port: &<Self::Cluster as Node>::Port,
205        p2: &Self::External,
206        p2_port: &<Self::External as Node>::Port,
207        shared_handle: String,
208    ) -> syn::Expr {
209        let _ = (c1, c1_port, p2, p2_port, shared_handle);
210        todo!("m2e_sink is not yet supported for this deploy backend")
211    }
212
213    fn cluster_ids(
214        of_cluster: LocationKey,
215    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
216
217    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
218
219    fn cluster_membership_stream(
220        env: &mut Self::InstantiateEnv,
221        at_location: &LocationId,
222        location_id: &LocationId,
223    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
224
225    /// Registers an embedded stream input for the given ident and element type.
226    ///
227    /// Only meaningful for the embedded deployment backend. The default
228    /// implementation panics.
229    fn register_embedded_stream_input(
230        _env: &mut Self::InstantiateEnv,
231        _location_key: LocationKey,
232        _ident: &syn::Ident,
233        _element_type: &syn::Type,
234    ) {
235        panic!("register_embedded_stream_input is only supported by EmbeddedDeploy");
236    }
237
238    /// Registers an embedded singleton input for the given ident and element type.
239    ///
240    /// Only meaningful for the embedded deployment backend. The default
241    /// implementation panics.
242    fn register_embedded_singleton_input(
243        _env: &mut Self::InstantiateEnv,
244        _location_key: LocationKey,
245        _ident: &syn::Ident,
246        _element_type: &syn::Type,
247    ) {
248        panic!("register_embedded_singleton_input is only supported by EmbeddedDeploy");
249    }
250
251    /// Registers an embedded output for the given ident and element type.
252    ///
253    /// Only meaningful for the embedded deployment backend. The default
254    /// implementation panics.
255    fn register_embedded_output(
256        _env: &mut Self::InstantiateEnv,
257        _location_key: LocationKey,
258        _ident: &syn::Ident,
259        _element_type: &syn::Type,
260    ) {
261        panic!("register_embedded_output is only supported by EmbeddedDeploy");
262    }
263}
264
265pub trait ProcessSpec<'a, D>
266where
267    D: Deploy<'a> + ?Sized,
268{
269    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
270}
271
272pub trait IntoProcessSpec<'a, D>
273where
274    D: Deploy<'a> + ?Sized,
275{
276    type ProcessSpec: ProcessSpec<'a, D>;
277    fn into_process_spec(self) -> Self::ProcessSpec;
278}
279
280impl<'a, D, T> IntoProcessSpec<'a, D> for T
281where
282    D: Deploy<'a> + ?Sized,
283    T: ProcessSpec<'a, D>,
284{
285    type ProcessSpec = T;
286    fn into_process_spec(self) -> Self::ProcessSpec {
287        self
288    }
289}
290
291pub trait ClusterSpec<'a, D>
292where
293    D: Deploy<'a> + ?Sized,
294{
295    fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
296}
297
298pub trait ExternalSpec<'a, D>
299where
300    D: Deploy<'a> + ?Sized,
301{
302    fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
303}
304
305pub trait Node {
306    /// A logical communication endpoint for this node.
307    ///
308    /// Implementors are free to choose the concrete representation (for example,
309    /// a handle or identifier), but it must be `Clone` so that a single logical
310    /// port can be duplicated and passed to multiple consumers. New ports are
311    /// allocated via [`Self::next_port`].
312    type Port: Clone;
313    type Meta: Default;
314    type InstantiateEnv;
315
316    /// Allocates and returns a new port.
317    fn next_port(&self) -> Self::Port;
318
319    fn update_meta(&self, meta: &Self::Meta);
320
321    fn instantiate(
322        &self,
323        env: &mut Self::InstantiateEnv,
324        meta: &mut Self::Meta,
325        graph: DfirGraph,
326        extra_stmts: &[syn::Stmt],
327        sidecars: &[syn::Expr],
328    );
329}
330
331pub type DynSourceSink<Out, In, InErr> = (
332    Pin<Box<dyn Stream<Item = Out>>>,
333    Pin<Box<dyn Sink<In, Error = InErr>>>,
334);
335
336pub trait RegisterPort<'a, D>: Node + Clone
337where
338    D: Deploy<'a> + ?Sized,
339{
340    fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
341
342    fn as_bytes_bidi(
343        &self,
344        external_port_id: ExternalPortId,
345    ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
346
347    fn as_bincode_bidi<InT, OutT>(
348        &self,
349        external_port_id: ExternalPortId,
350    ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
351    where
352        InT: Serialize + 'static,
353        OutT: DeserializeOwned + 'static;
354
355    fn as_bincode_sink<T>(
356        &self,
357        external_port_id: ExternalPortId,
358    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
359    where
360        T: Serialize + 'static;
361
362    fn as_bincode_source<T>(
363        &self,
364        external_port_id: ExternalPortId,
365    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
366    where
367        T: DeserializeOwned + 'static;
368}