1use std::any::type_name;
4use std::fmt;
5use std::rc::Rc;
6
7use futures::stream::SplitSink;
8use futures::{SinkExt, StreamExt};
9use wasm_bindgen::UnwrapThrowExt;
10use yew::html::Scope;
11use yew::platform::pinned::RwLock;
12use yew::platform::spawn_local;
13use yew::prelude::*;
14
15use crate::oneshot::{Oneshot, OneshotProviderState};
16use crate::reactor::{Reactor, ReactorBridge, ReactorEvent, ReactorProviderState, ReactorScoped};
17use crate::worker::{Worker, WorkerBridge, WorkerProviderState};
18
19#[derive(Debug)]
21pub struct WorkerBridgeHandle<W>
22where
23 W: Worker,
24{
25 inner: WorkerBridge<W>,
26}
27
28impl<W> WorkerBridgeHandle<W>
29where
30 W: Worker,
31{
32 pub fn send(&self, input: W::Input) {
34 self.inner.send(input)
35 }
36}
37
38type ReactorTx<R> =
39 Rc<RwLock<SplitSink<ReactorBridge<R>, <<R as Reactor>::Scope as ReactorScoped>::Input>>>;
40
41pub struct ReactorBridgeHandle<R>
43where
44 R: Reactor + 'static,
45{
46 tx: ReactorTx<R>,
47}
48
49impl<R> fmt::Debug for ReactorBridgeHandle<R>
50where
51 R: Reactor + 'static,
52{
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct(type_name::<Self>()).finish_non_exhaustive()
55 }
56}
57
58impl<R> ReactorBridgeHandle<R>
59where
60 R: Reactor + 'static,
61{
62 pub fn send(&self, input: <R::Scope as ReactorScoped>::Input) {
64 let tx = self.tx.clone();
65 spawn_local(async move {
66 let mut tx = tx.write().await;
67 let _ = tx.send(input).await;
68 });
69 }
70}
71
72pub trait AgentScopeExt {
76 fn bridge_worker<W>(&self, callback: Callback<W::Output>) -> WorkerBridgeHandle<W>
78 where
79 W: Worker + 'static;
80
81 fn bridge_reactor<R>(&self, callback: Callback<ReactorEvent<R>>) -> ReactorBridgeHandle<R>
83 where
84 R: Reactor<Scope: ReactorScoped<Output: 'static>> + 'static;
85
86 fn run_oneshot<T>(&self, input: T::Input, callback: Callback<T::Output>)
88 where
89 T: Oneshot + 'static;
90}
91
92impl<COMP> AgentScopeExt for Scope<COMP>
93where
94 COMP: Component,
95{
96 fn bridge_worker<W>(&self, callback: Callback<W::Output>) -> WorkerBridgeHandle<W>
97 where
98 W: Worker + 'static,
99 {
100 let inner = self
101 .context::<Rc<WorkerProviderState<W>>>((|_| {}).into())
102 .expect_throw("failed to bridge to agent.")
103 .0
104 .create_bridge(callback);
105
106 WorkerBridgeHandle { inner }
107 }
108
109 fn bridge_reactor<R>(&self, callback: Callback<ReactorEvent<R>>) -> ReactorBridgeHandle<R>
110 where
111 R: Reactor<Scope: ReactorScoped<Output: 'static>> + 'static,
112 {
113 let (tx, mut rx) = self
114 .context::<ReactorProviderState<R>>((|_| {}).into())
115 .expect_throw("failed to bridge to agent.")
116 .0
117 .create_bridge()
118 .split();
119
120 spawn_local(async move {
121 while let Some(m) = rx.next().await {
122 callback.emit(ReactorEvent::<R>::Output(m));
123 }
124
125 callback.emit(ReactorEvent::<R>::Finished);
126 });
127
128 let tx = Rc::new(RwLock::new(tx));
129
130 ReactorBridgeHandle { tx }
131 }
132
133 fn run_oneshot<T>(&self, input: T::Input, callback: Callback<T::Output>)
134 where
135 T: Oneshot + 'static,
136 {
137 let (inner, _) = self
138 .context::<OneshotProviderState<T>>((|_| {}).into())
139 .expect_throw("failed to bridge to agent.");
140
141 spawn_local(async move { callback.emit(inner.create_bridge().run(input).await) });
142 }
143}