yew_agent/reactor/
hooks.rs1use std::any::type_name;
2use std::fmt;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use futures::sink::SinkExt;
7use futures::stream::{SplitSink, StreamExt};
8use wasm_bindgen::UnwrapThrowExt;
9use yew::platform::pinned::RwLock;
10use yew::platform::spawn_local;
11use yew::prelude::*;
12
13use super::provider::ReactorProviderState;
14use super::{Reactor, ReactorBridge, ReactorScoped};
15use crate::utils::{BridgeIdState, OutputsAction, OutputsState};
16
17type ReactorTx<R> =
18 Rc<RwLock<SplitSink<ReactorBridge<R>, <<R as Reactor>::Scope as ReactorScoped>::Input>>>;
19
20pub enum ReactorEvent<R>
22where
23 R: Reactor,
24{
25 Output(<R::Scope as ReactorScoped>::Output),
27 Finished,
29}
30
31impl<R> fmt::Debug for ReactorEvent<R>
32where
33 R: Reactor<Scope: ReactorScoped<Output: fmt::Debug>>,
34{
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 match self {
37 Self::Output(m) => f.debug_tuple("ReactorEvent::Output").field(&m).finish(),
38 Self::Finished => f.debug_tuple("ReactorEvent::Finished").finish(),
39 }
40 }
41}
42
43pub struct UseReactorBridgeHandle<R>
45where
46 R: 'static + Reactor,
47{
48 tx: ReactorTx<R>,
49 ctr: UseReducerDispatcher<BridgeIdState>,
50}
51
52impl<R> fmt::Debug for UseReactorBridgeHandle<R>
53where
54 R: 'static + Reactor<Scope: ReactorScoped<Input: fmt::Debug>>,
55{
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 f.debug_struct(type_name::<Self>())
58 .field("inner", &self.tx)
59 .finish_non_exhaustive()
60 }
61}
62
63impl<R> Clone for UseReactorBridgeHandle<R>
64where
65 R: 'static + Reactor,
66{
67 fn clone(&self) -> Self {
68 Self {
69 tx: self.tx.clone(),
70 ctr: self.ctr.clone(),
71 }
72 }
73}
74
75impl<R> UseReactorBridgeHandle<R>
76where
77 R: 'static + Reactor,
78{
79 pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
81 let tx = self.tx.clone();
82 spawn_local(async move {
83 let mut tx = tx.write().await;
84 let _ = tx.send(msg).await;
85 });
86 }
87
88 pub fn reset(&self) {
92 self.ctr.dispatch(());
93 }
94}
95
96impl<R> PartialEq for UseReactorBridgeHandle<R>
97where
98 R: 'static + Reactor,
99{
100 fn eq(&self, rhs: &Self) -> bool {
101 self.ctr == rhs.ctr
102 }
103}
104
105#[hook]
114pub fn use_reactor_bridge<R, F>(on_output: F) -> UseReactorBridgeHandle<R>
115where
116 R: 'static + Reactor,
117 F: Fn(ReactorEvent<R>) + 'static,
118{
119 let ctr = use_reducer(BridgeIdState::default);
120
121 let worker_state = use_context::<ReactorProviderState<R>>()
122 .expect_throw("cannot find a provider for current agent.");
123
124 let on_output = Rc::new(on_output);
125
126 let on_output_ref = {
127 let on_output = on_output.clone();
128 use_mut_ref(move || on_output)
129 };
130
131 {
133 let mut on_output_ref = on_output_ref.borrow_mut();
134 *on_output_ref = on_output;
135 }
136
137 let tx = use_memo((worker_state, ctr.inner), |(state, _ctr)| {
138 let bridge = state.create_bridge();
139
140 let (tx, mut rx) = bridge.split();
141
142 spawn_local(async move {
143 while let Some(m) = rx.next().await {
144 let on_output = on_output_ref.borrow().clone();
145 on_output(ReactorEvent::<R>::Output(m));
146 }
147
148 let on_output = on_output_ref.borrow().clone();
149 on_output(ReactorEvent::<R>::Finished);
150 });
151
152 RwLock::new(tx)
153 });
154
155 UseReactorBridgeHandle {
156 tx: tx.clone(),
157 ctr: ctr.dispatcher(),
158 }
159}
160
161pub struct UseReactorSubscriptionHandle<R>
163where
164 R: 'static + Reactor,
165{
166 bridge: UseReactorBridgeHandle<R>,
167 outputs: Vec<Rc<<R::Scope as ReactorScoped>::Output>>,
168 finished: bool,
169 ctr: usize,
170}
171
172impl<R> UseReactorSubscriptionHandle<R>
173where
174 R: 'static + Reactor,
175{
176 pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
178 self.bridge.send(msg);
179 }
180
181 pub fn finished(&self) -> bool {
183 self.finished
184 }
185
186 pub fn reset(&self) {
191 self.bridge.reset();
192 }
193}
194
195impl<R> Clone for UseReactorSubscriptionHandle<R>
196where
197 R: 'static + Reactor,
198{
199 fn clone(&self) -> Self {
200 Self {
201 bridge: self.bridge.clone(),
202 outputs: self.outputs.clone(),
203 ctr: self.ctr,
204 finished: self.finished,
205 }
206 }
207}
208
209impl<R> fmt::Debug for UseReactorSubscriptionHandle<R>
210where
211 R: 'static + Reactor<Scope: ReactorScoped<Input: fmt::Debug, Output: fmt::Debug>>,
212{
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 f.debug_struct(type_name::<Self>())
215 .field("bridge", &self.bridge)
216 .field("outputs", &self.outputs)
217 .finish_non_exhaustive()
218 }
219}
220
221impl<R> Deref for UseReactorSubscriptionHandle<R>
222where
223 R: 'static + Reactor,
224{
225 type Target = [Rc<<R::Scope as ReactorScoped>::Output>];
226
227 fn deref(&self) -> &Self::Target {
228 &self.outputs
229 }
230}
231
232impl<R> PartialEq for UseReactorSubscriptionHandle<R>
233where
234 R: 'static + Reactor,
235{
236 fn eq(&self, rhs: &Self) -> bool {
237 self.bridge == rhs.bridge && self.ctr == rhs.ctr
238 }
239}
240
241#[hook]
245pub fn use_reactor_subscription<R>() -> UseReactorSubscriptionHandle<R>
246where
247 R: 'static + Reactor,
248{
249 let outputs = use_reducer(OutputsState::<<R::Scope as ReactorScoped>::Output>::default);
250
251 let bridge = {
252 let outputs = outputs.clone();
253 use_reactor_bridge::<R, _>(move |output| {
254 outputs.dispatch(match output {
255 ReactorEvent::Output(m) => OutputsAction::Push(m.into()),
256 ReactorEvent::Finished => OutputsAction::Close,
257 })
258 })
259 };
260
261 {
262 let outputs = outputs.clone();
263 use_effect_with(bridge.clone(), move |_| {
264 outputs.dispatch(OutputsAction::Reset);
265
266 || {}
267 });
268 }
269
270 UseReactorSubscriptionHandle {
271 bridge,
272 outputs: outputs.inner.clone(),
273 ctr: outputs.ctr,
274 finished: outputs.closed,
275 }
276}