Skip to content
Snippets Groups Projects
Commit 25c3568d authored by Jens Nolte's avatar Jens Nolte
Browse files

Add pingObservable to force roundtrip


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 81710d78
No related branches found
No related tags found
No related merge requests found
......@@ -91,13 +91,18 @@ class IsRetrievable r a => IsObservable r a | a -> r where
-> m ()
observe observable = observe (toObservable observable)
pingObservable
:: (MonadQuasar m, MonadIO m)
=> a -- ^ observable
-> m ()
toObservable :: a -> Observable r
toObservable = Observable
mapObservable :: (r -> r2) -> a -> Observable r2
mapObservable f = Observable . MappedObservable f
{-# MINIMAL toObservable | observe #-}
{-# MINIMAL toObservable | observe, pingObservable #-}
type ObservableCallback v = ObservableState v -> QuasarSTM ()
......@@ -212,6 +217,7 @@ instance IsRetrievable v (ConstObservable v) where
instance IsObservable v (ConstObservable v) where
observe (ConstObservable x) callback =
ensureQuasarSTM $ callback $ ObservableValue x
pingObservable _ = pure ()
data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o
......@@ -220,6 +226,7 @@ instance IsRetrievable v (MappedObservable v) where
instance IsObservable v (MappedObservable v) where
observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn)
mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream
pingObservable (MappedObservable _ observable) = pingObservable observable
-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
......@@ -229,11 +236,11 @@ instance IsObservable v (MappedObservable v) where
data LiftA2Observable r = forall r0 r1. LiftA2Observable (r0 -> r1 -> r) (Observable r0) (Observable r1)
instance IsRetrievable r (LiftA2Observable r) where
retrieve (LiftA2Observable fn fx fy) = do
retrieve (LiftA2Observable fn fx fy) = liftQuasarIO do
-- LATER: keep backpressure for parallel network requests
x <- async $ retrieve fx
y <- async $ retrieve fy
liftIO $ liftA2 fn (await x) (await y)
liftA2 fn (await x) (await y)
instance IsObservable r (LiftA2Observable r) where
observe (LiftA2Observable fn fx fy) callback = do
......@@ -247,6 +254,13 @@ instance IsObservable r (LiftA2Observable r) where
observe fx (\update -> ensureSTM (writeTVar var0 (Just update)) >> callCallback)
observe fy (\update -> ensureSTM (writeTVar var1 (Just update)) >> callCallback)
pingObservable (LiftA2Observable _ fx fy) = liftQuasarIO do
-- LATER: keep backpressure for parallel network requests
x <- async $ pingObservable fx
y <- async $ pingObservable fy
await x
await y
--data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
--
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment