diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index f7fa45ee5cc03399b6806799f7df3c076d23e35b..4f50a66e58cbdabba4c5cbc9a6a9844d1360e9a0 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -91,13 +91,18 @@ class IsRetrievable r a => IsObservable r a | a -> r where -> m () observe observable = observe (toObservable observable) + pingObservable + :: (MonadQuasar m, MonadIO m) + => a -- ^ observable + -> m () + toObservable :: a -> Observable r toObservable = Observable mapObservable :: (r -> r2) -> a -> Observable r2 mapObservable f = Observable . MappedObservable f - {-# MINIMAL toObservable | observe #-} + {-# MINIMAL toObservable | observe, pingObservable #-} type ObservableCallback v = ObservableState v -> QuasarSTM () @@ -212,6 +217,7 @@ instance IsRetrievable v (ConstObservable v) where instance IsObservable v (ConstObservable v) where observe (ConstObservable x) callback = ensureQuasarSTM $ callback $ ObservableValue x + pingObservable _ = pure () data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o @@ -220,6 +226,7 @@ instance IsRetrievable v (MappedObservable v) where instance IsObservable v (MappedObservable v) where observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn) mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream + pingObservable (MappedObservable _ observable) = pingObservable observable -- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting @@ -229,11 +236,11 @@ instance IsObservable v (MappedObservable v) where data LiftA2Observable r = forall r0 r1. LiftA2Observable (r0 -> r1 -> r) (Observable r0) (Observable r1) instance IsRetrievable r (LiftA2Observable r) where - retrieve (LiftA2Observable fn fx fy) = do + retrieve (LiftA2Observable fn fx fy) = liftQuasarIO do -- LATER: keep backpressure for parallel network requests x <- async $ retrieve fx y <- async $ retrieve fy - liftIO $ liftA2 fn (await x) (await y) + liftA2 fn (await x) (await y) instance IsObservable r (LiftA2Observable r) where observe (LiftA2Observable fn fx fy) callback = do @@ -247,6 +254,13 @@ instance IsObservable r (LiftA2Observable r) where observe fx (\update -> ensureSTM (writeTVar var0 (Just update)) >> callCallback) observe fy (\update -> ensureSTM (writeTVar var1 (Just update)) >> callCallback) + pingObservable (LiftA2Observable _ fx fy) = liftQuasarIO do + -- LATER: keep backpressure for parallel network requests + x <- async $ pingObservable fx + y <- async $ pingObservable fy + await x + await y + --data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r) --