diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index c38b16d50de93fb0f9674e381b5276cb92447497..da31a333ad3b6dc8163b927d73e1e217796fdb3c 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -9,7 +9,6 @@ module Quasar.Observable ( ObservableMessage(..), toObservableUpdate, asyncObserve, - observeBlocking, -- * ObservableVar ObservableVar, @@ -22,6 +21,7 @@ module Quasar.Observable ( -- * Helper functions observeWhile, observeWhile_, + observeBlocking, fnObservable, synchronousFnObservable, mergeObservable, @@ -38,6 +38,7 @@ import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Except +import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.HashMap.Strict qualified as HM import Data.IORef @@ -47,7 +48,6 @@ import Quasar.Awaitable import Quasar.Disposable import Quasar.Prelude - data ObservableMessage a = ObservableUpdate a | ObservableLoading @@ -80,11 +80,24 @@ class IsRetrievable v a | a -> v where retrieveIO :: IsRetrievable v a => a -> IO v retrieveIO x = withOnResourceManager $ await =<< retrieve x -{-# DEPRECATED oldObserve "Old implementation of `observe`." #-} class IsRetrievable v o => IsObservable v o | o -> v where + observe + :: MonadResourceManager m + => o -- ^ observable + -> (forall f. MonadResourceManager f => ObservableMessage v -> f (Awaitable ())) -- ^ callback + -> m () + -- NOTE Compatability implementation, has to be removed when `oldObserve` is removed + observe observable callback = mask_ do + resourceManager <- askResourceManager + disposable <- liftIO $ oldObserve observable (\msg -> runReaderT (await =<< callback msg) resourceManager) + registerDisposable disposable + oldObserve :: o -> (ObservableMessage v -> IO ()) -> IO Disposable oldObserve observable callback = do - forkTask_ $ withOnResourceManager $ observeBlocking observable (liftIO . callback) + resourceManager <- unsafeNewResourceManager + onResourceManager resourceManager do + observe observable $ \msg -> liftIO (callback msg) >> pure (pure ()) + pure $ toDisposable resourceManager toObservable :: o -> Observable v toObservable = Observable @@ -92,7 +105,12 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f + {-# MINIMAL observe | oldObserve #-} + +{-# DEPRECATED oldObserve "Old implementation of `observe`." #-} + +-- | Observes an observable by handling updates on the current thread. observeBlocking :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a observeBlocking observable callback = do msgVar <- liftIO $ newTVarIO ObservableLoading @@ -174,6 +192,7 @@ data Observable v = forall o. IsObservable v o => Observable o instance IsRetrievable v (Observable v) where retrieve (Observable o) = retrieve o instance IsObservable v (Observable v) where + observe (Observable o) = observe o oldObserve (Observable o) = oldObserve o toObservable = id mapObservable f (Observable o) = mapObservable f o @@ -210,7 +229,8 @@ data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> instance IsRetrievable v (MappedObservable v) where retrieve (MappedObservable f observable) = f <<$>> retrieve observable instance IsObservable v (MappedObservable v) where - oldObserve (MappedObservable f observable) callback = oldObserve observable (callback . fmap f) + observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn) + oldObserve (MappedObservable fn observable) callback = oldObserve observable (callback . fmap fn) mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream @@ -476,18 +496,16 @@ newtype ConstObservable v = ConstObservable v instance IsRetrievable v (ConstObservable v) where retrieve (ConstObservable x) = pure $ pure x instance IsObservable v (ConstObservable v) where - oldObserve (ConstObservable x) callback = do - callback $ ObservableUpdate x - pure noDisposable + observe (ConstObservable x) callback = do + void $ callback $ ObservableUpdate x newtype FailedObservable v = FailedObservable SomeException instance IsRetrievable v (FailedObservable v) where retrieve (FailedObservable ex) = liftIO $ throwIO ex instance IsObservable v (FailedObservable v) where - oldObserve (FailedObservable ex) callback = do - callback $ ObservableNotAvailable ex - pure noDisposable + observe (FailedObservable ex) callback = do + void $ callback $ ObservableNotAvailable ex -- | Create an observable by simply running an IO action whenever a value is requested or a callback is registered.