Skip to main content
This is unreleased documentation for Yew Next version.
For up-to-date documentation, see the latest version on docs.rs.

yew_agent/reactor/
hooks.rs

1use std::any::type_name;
2use std::fmt;
3use std::ops::Deref;
4use std::rc::Rc;
5
6use futures::sink::SinkExt;
7use futures::stream::{SplitSink, StreamExt};
8use wasm_bindgen::UnwrapThrowExt;
9use yew::platform::pinned::RwLock;
10use yew::platform::spawn_local;
11use yew::prelude::*;
12
13use super::provider::ReactorProviderState;
14use super::{Reactor, ReactorBridge, ReactorScoped};
15use crate::utils::{BridgeIdState, OutputsAction, OutputsState};
16
17type ReactorTx<R> =
18    Rc<RwLock<SplitSink<ReactorBridge<R>, <<R as Reactor>::Scope as ReactorScoped>::Input>>>;
19
20/// A type that represents events from a reactor.
21pub enum ReactorEvent<R>
22where
23    R: Reactor,
24{
25    /// The reactor agent has sent an output.
26    Output(<R::Scope as ReactorScoped>::Output),
27    /// The reactor agent has exited.
28    Finished,
29}
30
31impl<R> fmt::Debug for ReactorEvent<R>
32where
33    R: Reactor<Scope: ReactorScoped<Output: fmt::Debug>>,
34{
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        match self {
37            Self::Output(m) => f.debug_tuple("ReactorEvent::Output").field(&m).finish(),
38            Self::Finished => f.debug_tuple("ReactorEvent::Finished").finish(),
39        }
40    }
41}
42
43/// Hook handle for the [`use_reactor_bridge`] hook.
44pub struct UseReactorBridgeHandle<R>
45where
46    R: 'static + Reactor,
47{
48    tx: ReactorTx<R>,
49    ctr: UseReducerDispatcher<BridgeIdState>,
50}
51
52impl<R> fmt::Debug for UseReactorBridgeHandle<R>
53where
54    R: 'static + Reactor<Scope: ReactorScoped<Input: fmt::Debug>>,
55{
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        f.debug_struct(type_name::<Self>())
58            .field("inner", &self.tx)
59            .finish_non_exhaustive()
60    }
61}
62
63impl<R> Clone for UseReactorBridgeHandle<R>
64where
65    R: 'static + Reactor,
66{
67    fn clone(&self) -> Self {
68        Self {
69            tx: self.tx.clone(),
70            ctr: self.ctr.clone(),
71        }
72    }
73}
74
75impl<R> UseReactorBridgeHandle<R>
76where
77    R: 'static + Reactor,
78{
79    /// Send an input to a reactor agent.
80    pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
81        let tx = self.tx.clone();
82        spawn_local(async move {
83            let mut tx = tx.write().await;
84            let _ = tx.send(msg).await;
85        });
86    }
87
88    /// Reset the bridge.
89    ///
90    /// Disconnect the old bridge and re-connects the agent with a new bridge.
91    pub fn reset(&self) {
92        self.ctr.dispatch(());
93    }
94}
95
96impl<R> PartialEq for UseReactorBridgeHandle<R>
97where
98    R: 'static + Reactor,
99{
100    fn eq(&self, rhs: &Self) -> bool {
101        self.ctr == rhs.ctr
102    }
103}
104
105/// A hook to bridge to a [`Reactor`].
106///
107/// This hooks will only bridge the reactor once over the entire component lifecycle.
108///
109/// Takes a callback as the argument.
110///
111/// The callback will be updated on every render to make sure captured values (if any) are up to
112/// date.
113#[hook]
114pub fn use_reactor_bridge<R, F>(on_output: F) -> UseReactorBridgeHandle<R>
115where
116    R: 'static + Reactor,
117    F: Fn(ReactorEvent<R>) + 'static,
118{
119    let ctr = use_reducer(BridgeIdState::default);
120
121    let worker_state = use_context::<ReactorProviderState<R>>()
122        .expect_throw("cannot find a provider for current agent.");
123
124    let on_output = Rc::new(on_output);
125
126    let on_output_ref = {
127        let on_output = on_output.clone();
128        use_mut_ref(move || on_output)
129    };
130
131    // Refresh the callback on every render.
132    {
133        let mut on_output_ref = on_output_ref.borrow_mut();
134        *on_output_ref = on_output;
135    }
136
137    let tx = use_memo((worker_state, ctr.inner), |(state, _ctr)| {
138        let bridge = state.create_bridge();
139
140        let (tx, mut rx) = bridge.split();
141
142        spawn_local(async move {
143            while let Some(m) = rx.next().await {
144                let on_output = on_output_ref.borrow().clone();
145                on_output(ReactorEvent::<R>::Output(m));
146            }
147
148            let on_output = on_output_ref.borrow().clone();
149            on_output(ReactorEvent::<R>::Finished);
150        });
151
152        RwLock::new(tx)
153    });
154
155    UseReactorBridgeHandle {
156        tx: tx.clone(),
157        ctr: ctr.dispatcher(),
158    }
159}
160
161/// Hook handle for the [`use_reactor_subscription`] hook.
162pub struct UseReactorSubscriptionHandle<R>
163where
164    R: 'static + Reactor,
165{
166    bridge: UseReactorBridgeHandle<R>,
167    outputs: Vec<Rc<<R::Scope as ReactorScoped>::Output>>,
168    finished: bool,
169    ctr: usize,
170}
171
172impl<R> UseReactorSubscriptionHandle<R>
173where
174    R: 'static + Reactor,
175{
176    /// Send an input to a reactor agent.
177    pub fn send(&self, msg: <R::Scope as ReactorScoped>::Input) {
178        self.bridge.send(msg);
179    }
180
181    /// Returns whether the current bridge has received a finish message.
182    pub fn finished(&self) -> bool {
183        self.finished
184    }
185
186    /// Reset the subscription.
187    ///
188    /// This disconnects the old bridge and re-connects the agent with a new bridge.
189    /// Existing outputs stored in the subscription will also be cleared.
190    pub fn reset(&self) {
191        self.bridge.reset();
192    }
193}
194
195impl<R> Clone for UseReactorSubscriptionHandle<R>
196where
197    R: 'static + Reactor,
198{
199    fn clone(&self) -> Self {
200        Self {
201            bridge: self.bridge.clone(),
202            outputs: self.outputs.clone(),
203            ctr: self.ctr,
204            finished: self.finished,
205        }
206    }
207}
208
209impl<R> fmt::Debug for UseReactorSubscriptionHandle<R>
210where
211    R: 'static + Reactor<Scope: ReactorScoped<Input: fmt::Debug, Output: fmt::Debug>>,
212{
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        f.debug_struct(type_name::<Self>())
215            .field("bridge", &self.bridge)
216            .field("outputs", &self.outputs)
217            .finish_non_exhaustive()
218    }
219}
220
221impl<R> Deref for UseReactorSubscriptionHandle<R>
222where
223    R: 'static + Reactor,
224{
225    type Target = [Rc<<R::Scope as ReactorScoped>::Output>];
226
227    fn deref(&self) -> &Self::Target {
228        &self.outputs
229    }
230}
231
232impl<R> PartialEq for UseReactorSubscriptionHandle<R>
233where
234    R: 'static + Reactor,
235{
236    fn eq(&self, rhs: &Self) -> bool {
237        self.bridge == rhs.bridge && self.ctr == rhs.ctr
238    }
239}
240
241/// A hook to subscribe to the outputs of a [Reactor] agent.
242///
243/// All outputs sent to current bridge will be collected into a slice.
244#[hook]
245pub fn use_reactor_subscription<R>() -> UseReactorSubscriptionHandle<R>
246where
247    R: 'static + Reactor,
248{
249    let outputs = use_reducer(OutputsState::<<R::Scope as ReactorScoped>::Output>::default);
250
251    let bridge = {
252        let outputs = outputs.clone();
253        use_reactor_bridge::<R, _>(move |output| {
254            outputs.dispatch(match output {
255                ReactorEvent::Output(m) => OutputsAction::Push(m.into()),
256                ReactorEvent::Finished => OutputsAction::Close,
257            })
258        })
259    };
260
261    {
262        let outputs = outputs.clone();
263        use_effect_with(bridge.clone(), move |_| {
264            outputs.dispatch(OutputsAction::Reset);
265
266            || {}
267        });
268    }
269
270    UseReactorSubscriptionHandle {
271        bridge,
272        outputs: outputs.inner.clone(),
273        ctr: outputs.ctr,
274        finished: outputs.closed,
275    }
276}