1use std::io::Error;
2use std::pin::Pin;
3
4use bytes::{Bytes, BytesMut};
5use dfir_lang::graph::DfirGraph;
6use futures::{Sink, Stream};
7use serde::Serialize;
8use serde::de::DeserializeOwned;
9use stageleft::QuotedWithContext;
10
11use crate::compile::builder::ExternalPortId;
12use crate::location::dynamic::LocationId;
13use crate::location::member_id::TaglessMemberId;
14use crate::location::{LocationKey, MembershipEvent, NetworkHint};
15
16pub trait Deploy<'a> {
17 type Meta: Default;
18 type InstantiateEnv;
19
20 type Process: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
21 type Cluster: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv> + Clone;
22 type External: Node<Meta = Self::Meta, InstantiateEnv = Self::InstantiateEnv>
23 + RegisterPort<'a, Self>;
24
25 fn o2o_sink_source(
31 env: &mut Self::InstantiateEnv,
32 p1: &Self::Process,
33 p1_port: &<Self::Process as Node>::Port,
34 p2: &Self::Process,
35 p2_port: &<Self::Process as Node>::Port,
36 name: Option<&str>,
37 networking_info: &crate::networking::NetworkingInfo,
38 ) -> (syn::Expr, syn::Expr);
39
40 fn o2o_connect(
45 p1: &Self::Process,
46 p1_port: &<Self::Process as Node>::Port,
47 p2: &Self::Process,
48 p2_port: &<Self::Process as Node>::Port,
49 ) -> Box<dyn FnOnce()>;
50
51 fn o2m_sink_source(
59 env: &mut Self::InstantiateEnv,
60 p1: &Self::Process,
61 p1_port: &<Self::Process as Node>::Port,
62 c2: &Self::Cluster,
63 c2_port: &<Self::Cluster as Node>::Port,
64 name: Option<&str>,
65 networking_info: &crate::networking::NetworkingInfo,
66 ) -> (syn::Expr, syn::Expr);
67
68 fn o2m_connect(
73 p1: &Self::Process,
74 p1_port: &<Self::Process as Node>::Port,
75 c2: &Self::Cluster,
76 c2_port: &<Self::Cluster as Node>::Port,
77 ) -> Box<dyn FnOnce()>;
78
79 fn m2o_sink_source(
87 env: &mut Self::InstantiateEnv,
88 c1: &Self::Cluster,
89 c1_port: &<Self::Cluster as Node>::Port,
90 p2: &Self::Process,
91 p2_port: &<Self::Process as Node>::Port,
92 name: Option<&str>,
93 networking_info: &crate::networking::NetworkingInfo,
94 ) -> (syn::Expr, syn::Expr);
95
96 fn m2o_connect(
101 c1: &Self::Cluster,
102 c1_port: &<Self::Cluster as Node>::Port,
103 p2: &Self::Process,
104 p2_port: &<Self::Process as Node>::Port,
105 ) -> Box<dyn FnOnce()>;
106
107 fn m2m_sink_source(
115 env: &mut Self::InstantiateEnv,
116 c1: &Self::Cluster,
117 c1_port: &<Self::Cluster as Node>::Port,
118 c2: &Self::Cluster,
119 c2_port: &<Self::Cluster as Node>::Port,
120 name: Option<&str>,
121 networking_info: &crate::networking::NetworkingInfo,
122 ) -> (syn::Expr, syn::Expr);
123
124 fn m2m_connect(
129 c1: &Self::Cluster,
130 c1_port: &<Self::Cluster as Node>::Port,
131 c2: &Self::Cluster,
132 c2_port: &<Self::Cluster as Node>::Port,
133 ) -> Box<dyn FnOnce()>;
134
135 fn e2o_many_source(
136 extra_stmts: &mut Vec<syn::Stmt>,
137 p2: &Self::Process,
138 p2_port: &<Self::Process as Node>::Port,
139 codec_type: &syn::Type,
140 shared_handle: String,
141 ) -> syn::Expr;
142 fn e2o_many_sink(shared_handle: String) -> syn::Expr;
143
144 fn e2o_source(
145 extra_stmts: &mut Vec<syn::Stmt>,
146 p1: &Self::External,
147 p1_port: &<Self::External as Node>::Port,
148 p2: &Self::Process,
149 p2_port: &<Self::Process as Node>::Port,
150 codec_type: &syn::Type,
151 shared_handle: String,
152 ) -> syn::Expr;
153 fn e2o_connect(
154 p1: &Self::External,
155 p1_port: &<Self::External as Node>::Port,
156 p2: &Self::Process,
157 p2_port: &<Self::Process as Node>::Port,
158 many: bool,
159 server_hint: NetworkHint,
160 ) -> Box<dyn FnOnce()>;
161
162 fn o2e_sink(
163 p1: &Self::Process,
164 p1_port: &<Self::Process as Node>::Port,
165 p2: &Self::External,
166 p2_port: &<Self::External as Node>::Port,
167 shared_handle: String,
168 ) -> syn::Expr;
169
170 fn e2m_source(
171 extra_stmts: &mut Vec<syn::Stmt>,
172 p1: &Self::External,
173 p1_port: &<Self::External as Node>::Port,
174 c2: &Self::Cluster,
175 c2_port: &<Self::Cluster as Node>::Port,
176 codec_type: &syn::Type,
177 shared_handle: String,
178 ) -> syn::Expr {
179 let _ = (
180 extra_stmts,
181 p1,
182 p1_port,
183 c2,
184 c2_port,
185 codec_type,
186 shared_handle,
187 );
188 todo!("e2m_source is not yet supported for this deploy backend")
189 }
190
191 fn e2m_connect(
192 p1: &Self::External,
193 p1_port: &<Self::External as Node>::Port,
194 c2: &Self::Cluster,
195 c2_port: &<Self::Cluster as Node>::Port,
196 server_hint: NetworkHint,
197 ) -> Box<dyn FnOnce()> {
198 let _ = (p1, p1_port, c2, c2_port, server_hint);
199 todo!("e2m_connect is not yet supported for this deploy backend")
200 }
201
202 fn m2e_sink(
203 c1: &Self::Cluster,
204 c1_port: &<Self::Cluster as Node>::Port,
205 p2: &Self::External,
206 p2_port: &<Self::External as Node>::Port,
207 shared_handle: String,
208 ) -> syn::Expr {
209 let _ = (c1, c1_port, p2, p2_port, shared_handle);
210 todo!("m2e_sink is not yet supported for this deploy backend")
211 }
212
213 fn cluster_ids(
214 of_cluster: LocationKey,
215 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a;
216
217 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a;
218
219 fn cluster_membership_stream(
220 env: &mut Self::InstantiateEnv,
221 at_location: &LocationId,
222 location_id: &LocationId,
223 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>;
224
225 fn register_embedded_stream_input(
230 _env: &mut Self::InstantiateEnv,
231 _location_key: LocationKey,
232 _ident: &syn::Ident,
233 _element_type: &syn::Type,
234 ) {
235 panic!("register_embedded_stream_input is only supported by EmbeddedDeploy");
236 }
237
238 fn register_embedded_singleton_input(
243 _env: &mut Self::InstantiateEnv,
244 _location_key: LocationKey,
245 _ident: &syn::Ident,
246 _element_type: &syn::Type,
247 ) {
248 panic!("register_embedded_singleton_input is only supported by EmbeddedDeploy");
249 }
250
251 fn register_embedded_output(
256 _env: &mut Self::InstantiateEnv,
257 _location_key: LocationKey,
258 _ident: &syn::Ident,
259 _element_type: &syn::Type,
260 ) {
261 panic!("register_embedded_output is only supported by EmbeddedDeploy");
262 }
263}
264
265pub trait ProcessSpec<'a, D>
266where
267 D: Deploy<'a> + ?Sized,
268{
269 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Process;
270}
271
272pub trait IntoProcessSpec<'a, D>
273where
274 D: Deploy<'a> + ?Sized,
275{
276 type ProcessSpec: ProcessSpec<'a, D>;
277 fn into_process_spec(self) -> Self::ProcessSpec;
278}
279
280impl<'a, D, T> IntoProcessSpec<'a, D> for T
281where
282 D: Deploy<'a> + ?Sized,
283 T: ProcessSpec<'a, D>,
284{
285 type ProcessSpec = T;
286 fn into_process_spec(self) -> Self::ProcessSpec {
287 self
288 }
289}
290
291pub trait ClusterSpec<'a, D>
292where
293 D: Deploy<'a> + ?Sized,
294{
295 fn build(self, location_key: LocationKey, name_hint: &str) -> D::Cluster;
296}
297
298pub trait ExternalSpec<'a, D>
299where
300 D: Deploy<'a> + ?Sized,
301{
302 fn build(self, location_key: LocationKey, name_hint: &str) -> D::External;
303}
304
305pub trait Node {
306 type Port: Clone;
313 type Meta: Default;
314 type InstantiateEnv;
315
316 fn next_port(&self) -> Self::Port;
318
319 fn update_meta(&self, meta: &Self::Meta);
320
321 fn instantiate(
322 &self,
323 env: &mut Self::InstantiateEnv,
324 meta: &mut Self::Meta,
325 graph: DfirGraph,
326 extra_stmts: &[syn::Stmt],
327 sidecars: &[syn::Expr],
328 );
329}
330
331pub type DynSourceSink<Out, In, InErr> = (
332 Pin<Box<dyn Stream<Item = Out>>>,
333 Pin<Box<dyn Sink<In, Error = InErr>>>,
334);
335
336pub trait RegisterPort<'a, D>: Node + Clone
337where
338 D: Deploy<'a> + ?Sized,
339{
340 fn register(&self, external_port_id: ExternalPortId, port: Self::Port);
341
342 fn as_bytes_bidi(
343 &self,
344 external_port_id: ExternalPortId,
345 ) -> impl Future<Output = DynSourceSink<Result<BytesMut, Error>, Bytes, Error>> + 'a;
346
347 fn as_bincode_bidi<InT, OutT>(
348 &self,
349 external_port_id: ExternalPortId,
350 ) -> impl Future<Output = DynSourceSink<OutT, InT, Error>> + 'a
351 where
352 InT: Serialize + 'static,
353 OutT: DeserializeOwned + 'static;
354
355 fn as_bincode_sink<T>(
356 &self,
357 external_port_id: ExternalPortId,
358 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = Error>>>> + 'a
359 where
360 T: Serialize + 'static;
361
362 fn as_bincode_source<T>(
363 &self,
364 external_port_id: ExternalPortId,
365 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
366 where
367 T: DeserializeOwned + 'static;
368}