From 25c3568da0f58bd7055dee8cf59f36546570795f Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 13 Mar 2022 01:10:30 +0100 Subject: [PATCH] Add pingObservable to force roundtrip Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Observable.hs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index f7fa45e..4f50a66 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -91,13 +91,18 @@ class IsRetrievable r a => IsObservable r a | a -> r where -> m () observe observable = observe (toObservable observable) + pingObservable + :: (MonadQuasar m, MonadIO m) + => a -- ^ observable + -> m () + toObservable :: a -> Observable r toObservable = Observable mapObservable :: (r -> r2) -> a -> Observable r2 mapObservable f = Observable . MappedObservable f - {-# MINIMAL toObservable | observe #-} + {-# MINIMAL toObservable | observe, pingObservable #-} type ObservableCallback v = ObservableState v -> QuasarSTM () @@ -212,6 +217,7 @@ instance IsRetrievable v (ConstObservable v) where instance IsObservable v (ConstObservable v) where observe (ConstObservable x) callback = ensureQuasarSTM $ callback $ ObservableValue x + pingObservable _ = pure () data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o @@ -220,6 +226,7 @@ instance IsRetrievable v (MappedObservable v) where instance IsObservable v (MappedObservable v) where observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn) mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream + pingObservable (MappedObservable _ observable) = pingObservable observable -- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting @@ -229,11 +236,11 @@ instance IsObservable v (MappedObservable v) where data LiftA2Observable r = forall r0 r1. LiftA2Observable (r0 -> r1 -> r) (Observable r0) (Observable r1) instance IsRetrievable r (LiftA2Observable r) where - retrieve (LiftA2Observable fn fx fy) = do + retrieve (LiftA2Observable fn fx fy) = liftQuasarIO do -- LATER: keep backpressure for parallel network requests x <- async $ retrieve fx y <- async $ retrieve fy - liftIO $ liftA2 fn (await x) (await y) + liftA2 fn (await x) (await y) instance IsObservable r (LiftA2Observable r) where observe (LiftA2Observable fn fx fy) callback = do @@ -247,6 +254,13 @@ instance IsObservable r (LiftA2Observable r) where observe fx (\update -> ensureSTM (writeTVar var0 (Just update)) >> callCallback) observe fy (\update -> ensureSTM (writeTVar var1 (Just update)) >> callCallback) + pingObservable (LiftA2Observable _ fx fy) = liftQuasarIO do + -- LATER: keep backpressure for parallel network requests + x <- async $ pingObservable fx + y <- async $ pingObservable fy + await x + await y + --data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r) -- -- GitLab