-
Jens Nolte authoredJens Nolte authored
Observable.hs 17.99 KiB
module Quasar.Observable (
-- * Observable core types
IsRetrievable(..),
IsObservable(..),
Observable(..),
ObservableState(..),
--toObservableUpdate,
-- * ObservableVar
ObservableVar,
newObservableVar,
newObservableVarIO,
setObservableVar,
modifyObservableVar,
stateObservableVar,
---- * Helper functions
--observeWhile,
--observeWhile_,
--observeBlocking,
--fnObservable,
--synchronousFnObservable,
---- * Helper types
--ObservableCallback,
) where
import Control.Applicative
import Control.Monad.Catch
import Control.Monad.Except
import Control.Monad.Trans.Maybe
import Data.HashMap.Strict qualified as HM
import Data.Unique
import Quasar.Async
import Quasar.Future
import Quasar.Prelude
import Quasar.MonadQuasar
import Quasar.MonadQuasar.Misc
import Quasar.Resources
data ObservableState a
= ObservableValue a
| ObservableLoading
| ObservableNotAvailable SomeException
deriving stock (Show, Generic)
instance Functor ObservableState where
fmap fn (ObservableValue x) = ObservableValue (fn x)
fmap _ ObservableLoading = ObservableLoading
fmap _ (ObservableNotAvailable ex) = ObservableNotAvailable ex
instance Applicative ObservableState where
pure = ObservableValue
liftA2 fn (ObservableValue x) (ObservableValue y) = ObservableValue (fn x y)
liftA2 _ (ObservableNotAvailable ex) _ = ObservableNotAvailable ex
liftA2 _ ObservableLoading _ = ObservableLoading
liftA2 _ _ (ObservableNotAvailable ex) = ObservableNotAvailable ex
liftA2 _ _ ObservableLoading = ObservableLoading
instance Monad ObservableState where
(ObservableValue x) >>= fn = fn x
ObservableLoading >>= _ = ObservableLoading
(ObservableNotAvailable ex) >>= _ = ObservableNotAvailable ex
-- TODO rename or delete
--toObservableUpdate :: MonadThrow m => ObservableState a -> m (Maybe a)
--toObservableUpdate (ObservableValue value) = pure $ Just value
--toObservableUpdate ObservableLoading = pure Nothing
--toObservableUpdate (ObservableNotAvailable ex) = throwM ex
class IsRetrievable r a | a -> r where
retrieve :: (MonadQuasar m, MonadIO m) => a -> m r
class IsRetrievable r a => IsObservable r a | a -> r where
-- | Register a callback to observe changes. The callback is called when the value changes, but depending on the
-- delivery method (e.g. network) intermediate values may be skipped.
--
-- A correct implementation of observe must call the callback during registration (if no value is available
-- immediately an `ObservableLoading` will be delivered).
--
-- The callback should return without blocking, otherwise other callbacks will be delayed. If the value can't be
-- processed immediately, use `observeBlocking` instead or manually pass the value to a thread that processes the
-- data.
observe
:: (MonadQuasar m)
=> a -- ^ observable
-> ObservableCallback r -- ^ callback
-> m ()
observe observable = observe (toObservable observable)
pingObservable
:: (MonadQuasar m, MonadIO m)
=> a -- ^ observable
-> m ()
toObservable :: a -> Observable r
toObservable = Observable
mapObservable :: (r -> r2) -> a -> Observable r2
mapObservable f = Observable . MappedObservable f . toObservable
{-# MINIMAL toObservable | observe, pingObservable #-}
type ObservableCallback v = ObservableState v -> QuasarSTM ()
-- | Existential quantification wrapper for the IsObservable type class.
data Observable r = forall a. IsObservable r a => Observable a
instance IsRetrievable r (Observable r) where
retrieve (Observable o) = retrieve o
instance IsObservable r (Observable r) where
observe (Observable o) = observe o
toObservable = id
mapObservable f (Observable o) = mapObservable f o
instance Functor Observable where
fmap f = mapObservable f
instance Applicative Observable where
pure = toObservable . ConstObservable
liftA2 fn x y = toObservable $ LiftA2Observable fn x y
--instance Monad Observable where
-- x >>= y = toObservable $ BindObservable x y
--
--instance MonadThrow Observable where
-- throwM :: forall e v. Exception e => e -> Observable v
-- throwM = toObservable . FailedObservable @v . toException
--
--instance MonadCatch Observable where
-- catch action handler = toObservable $ CatchObservable action handler
--
--instance MonadFail Observable where
-- fail = throwM . userError
--
--instance Alternative Observable where
-- empty = fail "empty"
-- x <|> y = x `catchAll` const y
--
--instance MonadPlus Observable
---- | Observe an observable by handling updates on the current thread.
----
---- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered).
----
---- The handler is allowed to block. When the value changes while the handler is running the handler will be run again
---- after it completes; when the value changes multiple times it will only be executed once (with the latest value).
--observeBlocking
-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
-- => o
-- -> (ObservableState v -> m ())
-- -> m a
--observeBlocking observable handler = do
-- -- `withScopedResourceManager` removes the `observe` callback when the `handler` fails.
-- withScopedResourceManager do
-- var <- liftIO newEmptyTMVarIO
-- observe observable \msg -> liftIO $ atomically do
-- void $ tryTakeTMVar var
-- putTMVar var msg
--
-- forever do
-- msg <- liftIO $ atomically $ takeTMVar var
-- handler msg
--
--
---- | Internal control flow exception for `observeWhile` and `observeWhile_`.
--data ObserveWhileCompleted = ObserveWhileCompleted
-- deriving stock (Eq, Show)
--
--instance Exception ObserveWhileCompleted
--
---- | Observe until the callback returns `Just`.
--observeWhile
-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
-- => o
-- -> (ObservableState v -> m (Maybe a))
-- -> m a
--observeWhile observable callback = do
-- resultVar <- liftIO $ newIORef unreachableCodePath
-- observeWhile_ observable \msg -> do
-- callback msg >>= \case
-- Just result -> do
-- liftIO $ writeIORef resultVar result
-- pure False
-- Nothing -> pure True
--
-- liftIO $ readIORef resultVar
--
--
---- | Observe until the callback returns `False`.
--observeWhile_
-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
-- => o
-- -> (ObservableState v -> m Bool)
-- -> m ()
--observeWhile_ observable callback =
-- catch
-- do
-- observeBlocking observable \msg -> do
-- continue <- callback msg
-- unless continue $ throwM ObserveWhileCompleted
-- \ObserveWhileCompleted -> pure ()
--
--
newtype ConstObservable r = ConstObservable r
instance IsRetrievable r (ConstObservable r) where
retrieve (ConstObservable x) = pure x
instance IsObservable r (ConstObservable r) where
observe (ConstObservable x) callback =
ensureQuasarSTM $ callback $ ObservableValue x
pingObservable _ = pure ()
data MappedObservable r = forall a. MappedObservable (a -> r) (Observable a)
instance IsRetrievable r (MappedObservable r) where
retrieve (MappedObservable f observable) = f <$> retrieve observable
instance IsObservable r (MappedObservable r) where
observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn)
pingObservable (MappedObservable _ observable) = pingObservable observable
mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream
-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
-- observable updates according to the merge function.
--
-- There is no caching involed, every subscriber effectively subscribes to both input observables.
data LiftA2Observable r = forall a b. LiftA2Observable (a -> b -> r) (Observable a) (Observable b)
instance IsRetrievable r (LiftA2Observable r) where
retrieve (LiftA2Observable fn fx fy) = liftQuasarIO do
-- LATER: keep backpressure for parallel network requests
future <- async $ retrieve fy
liftA2 fn (retrieve fx) (await future)
instance IsObservable r (LiftA2Observable r) where
observe (LiftA2Observable fn fx fy) callback = ensureQuasarSTM do
var0 <- liftSTM $ newTVar Nothing
var1 <- liftSTM $ newTVar Nothing
let callCallback = do
mergedValue <- liftSTM $ runMaybeT $ liftA2 (liftA2 fn) (MaybeT (readTVar var0)) (MaybeT (readTVar var1))
-- Run the callback only once both values have been received
mapM_ callback mergedValue
observe fx (\update -> liftSTM (writeTVar var0 (Just update)) >> callCallback)
observe fy (\update -> liftSTM (writeTVar var1 (Just update)) >> callCallback)
pingObservable (LiftA2Observable _ fx fy) = liftQuasarIO do
-- LATER: keep backpressure for parallel network requests
future <- async $ pingObservable fy
pingObservable fx
await future
mapObservable f1 (LiftA2Observable f2 fx fy) = Observable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy
--data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
--
--instance IsRetrievable r (BindObservable r) where
-- retrieve (BindObservable fx fn) = do
-- awaitable <- retrieve fx
-- value <- liftIO $ await awaitable
-- retrieve $ fn value
--
--instance IsObservable r (BindObservable r) where
-- observe (BindObservable fx fn) callback = do
-- disposableVar <- liftIO $ newTMVarIO noDisposable
-- keyVar <- liftIO $ newTMVarIO =<< newUnique
--
-- observe fx (leftCallback disposableVar keyVar)
-- where
-- leftCallback disposableVar keyVar message = do
-- key <- liftIO newUnique
--
-- oldDisposable <- liftIO $ atomically do
-- -- Blocks while `rightCallback` is running
-- void $ swapTMVar keyVar key
--
-- takeTMVar disposableVar
--
-- disposeEventually_ oldDisposable
--
-- disposable <- case message of
-- (ObservableValue x) -> captureDisposable_ $ observe (fn x) (rightCallback keyVar key)
-- ObservableLoading -> noDisposable <$ callback ObservableLoading
-- (ObservableNotAvailable ex) -> noDisposable <$ callback (ObservableNotAvailable ex)
--
-- liftIO $ atomically $ putTMVar disposableVar disposable
--
-- rightCallback :: TMVar Unique -> Unique -> ObservableState r -> ResourceManagerIO ()
-- rightCallback keyVar key message =
-- bracket
-- -- Take key var to prevent parallel callbacks
-- (liftIO $ atomically $ takeTMVar keyVar)
-- -- Put key back
-- (liftIO . atomically . putTMVar keyVar)
-- -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed
-- (\currentKey -> when (key == currentKey) $ callback message)
--
--
--data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r)
--
--instance IsRetrievable r (CatchObservable e r) where
-- retrieve (CatchObservable fx fn) = retrieve fx `catch` \ex -> retrieve (fn ex)
--
--instance IsObservable r (CatchObservable e r) where
-- observe (CatchObservable fx fn) callback = do
-- disposableVar <- liftIO $ newTMVarIO noDisposable
-- keyVar <- liftIO $ newTMVarIO =<< newUnique
--
-- observe fx (leftCallback disposableVar keyVar)
-- where
-- leftCallback disposableVar keyVar message = do
-- key <- liftIO newUnique
--
-- oldDisposable <- liftIO $ atomically do
-- -- Blocks while `rightCallback` is running
-- void $ swapTMVar keyVar key
--
-- takeTMVar disposableVar
--
-- disposeEventually_ oldDisposable
--
-- disposable <- case message of
-- (ObservableNotAvailable (fromException -> Just ex)) ->
-- captureDisposable_ $ observe (fn ex) (rightCallback keyVar key)
-- msg -> noDisposable <$ callback msg
--
-- liftIO $ atomically $ putTMVar disposableVar disposable
--
-- rightCallback :: TMVar Unique -> Unique -> ObservableState r -> ResourceManagerIO ()
-- rightCallback keyVar key message =
-- bracket
-- -- Take key var to prevent parallel callbacks
-- (liftIO $ atomically $ takeTMVar keyVar)
-- -- Put key back
-- (liftIO . atomically . putTMVar keyVar)
-- -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed
-- (\currentKey -> when (key == currentKey) $ callback message)
--
--
newtype ObserverRegistry a = ObserverRegistry (TVar (HM.HashMap Unique (ObservableCallback a)))
newObserverRegistry :: STM (ObserverRegistry a)
newObserverRegistry = ObserverRegistry <$> newTVar mempty
newObserverRegistryIO :: MonadIO m => m (ObserverRegistry a)
newObserverRegistryIO = liftIO $ ObserverRegistry <$> newTVarIO mempty
registerObserver :: ObserverRegistry a -> ObservableCallback a -> ObservableState a -> QuasarSTM ()
registerObserver (ObserverRegistry var) callback currentState = do
quasar <- askQuasar
key <- ensureSTM newUniqueSTM
ensureSTM $ modifyTVar var (HM.insert key (execForeignQuasarSTM quasar . callback))
registerDisposeTransaction_ $ modifyTVar var (HM.delete key)
callback currentState
updateObservers :: ObserverRegistry a -> ObservableState a -> QuasarSTM ()
updateObservers (ObserverRegistry var) newState =
mapM_ ($ newState) . HM.elems =<< ensureSTM (readTVar var)
data ObservableVar a = ObservableVar (TVar a) (ObserverRegistry a)
instance IsRetrievable a (ObservableVar a) where
retrieve (ObservableVar var _registry) = liftIO $ readTVarIO var
instance IsObservable a (ObservableVar a) where
observe (ObservableVar var registry) callback = ensureQuasarSTM do
registerObserver registry callback . ObservableValue =<< ensureSTM (readTVar var)
pingObservable _ = pure ()
newObservableVar :: MonadSTM m => a -> m (ObservableVar a)
newObservableVar x = liftSTM $ ObservableVar <$> newTVar x <*> newObserverRegistry
newObservableVarIO :: MonadIO m => a -> m (ObservableVar a)
newObservableVarIO x = liftIO $ ObservableVar <$> newTVarIO x <*> newObserverRegistryIO
setObservableVar :: MonadQuasar m => ObservableVar a -> a -> m ()
setObservableVar var = modifyObservableVar var . const
modifyObservableVar :: MonadQuasar m => ObservableVar a -> (a -> a) -> m ()
modifyObservableVar var f = stateObservableVar var (((), ) . f)
stateObservableVar :: MonadQuasar m => ObservableVar a -> (a -> (r, a)) -> m r
stateObservableVar (ObservableVar var registry) f = ensureQuasarSTM do
(result, newValue) <- liftSTM do
oldValue <- readTVar var
let (result, newValue) = f oldValue
writeTVar var newValue
pure (result, newValue)
updateObservers registry $ ObservableValue newValue
pure result
--newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v)))
--instance IsRetrievable v (ObservableVar v) where
-- retrieve (ObservableVar mvar) = liftIO $ pure . fst <$> readMVar mvar
--instance IsObservable v (ObservableVar v) where
-- observe (ObservableVar mvar) callback = do
-- resourceManager <- askResourceManager
--
-- registerNewResource_ $ liftIO do
-- let wrappedCallback = enterResourceManager resourceManager . callback
--
-- key <- liftIO newUnique
--
-- modifyMVar_ mvar $ \(state, subscribers) -> do
-- -- Call listener with initial value
-- wrappedCallback (pure state)
-- pure (state, HM.insert key wrappedCallback subscribers)
--
-- atomically $ newDisposable $ disposeFn key
-- where
-- disposeFn :: Unique -> IO ()
-- disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers))
--
--newObservableVar :: MonadIO m => v -> m (ObservableVar v)
--newObservableVar initialValue = liftIO do
-- ObservableVar <$> newMVar (initialValue, HM.empty)
--
--setObservableVar :: MonadIO m => ObservableVar v -> v -> m ()
--setObservableVar observable value = modifyObservableVar observable (const value)
--
--stateObservableVar :: MonadIO m => ObservableVar v -> (v -> (a, v)) -> m a
--stateObservableVar (ObservableVar mvar) f =
-- liftIO $ modifyMVar mvar $ \(oldState, subscribers) -> do
-- let (result, newState) = f oldState
-- mapM_ (\callback -> callback (pure newState)) subscribers
-- pure ((newState, subscribers), result)
--
--modifyObservableVar :: MonadIO m => ObservableVar v -> (v -> v) -> m ()
--modifyObservableVar observable f = stateObservableVar observable (((), ) . f)
--
--
--
--data FnObservable v = FnObservable {
-- retrieveFn :: ResourceManagerIO (Future v),
-- observeFn :: (ObservableState v -> ResourceManagerIO ()) -> ResourceManagerIO ()
--}
--instance IsRetrievable v (FnObservable v) where
-- retrieve FnObservable{retrieveFn} = liftResourceManagerIO retrieveFn
--instance IsObservable v (FnObservable v) where
-- observe FnObservable{observeFn} callback = liftResourceManagerIO $ observeFn callback
-- mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable {
-- retrieveFn = f <<$>> retrieveFn,
-- observeFn = \listener -> observeFn (listener . fmap f)
-- }
--
---- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`.
--fnObservable
-- :: ((ObservableState v -> ResourceManagerIO ()) -> ResourceManagerIO ())
-- -> ResourceManagerIO (Future v)
-- -> Observable v
--fnObservable observeFn retrieveFn = toObservable FnObservable{observeFn, retrieveFn}
--
---- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`.
--synchronousFnObservable
-- :: forall v.
-- ((ObservableState v -> ResourceManagerIO ()) -> ResourceManagerIO ())
-- -> IO v
-- -> Observable v
--synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn retrieveFn
-- where
-- retrieveFn :: ResourceManagerIO (Future v)
-- retrieveFn = liftIO $ pure <$> synchronousRetrieveFn
--newtype FailedObservable v = FailedObservable SomeException
--instance IsRetrievable v (FailedObservable v) where
-- retrieve (FailedObservable ex) = liftIO $ throwIO ex
--instance IsObservable v (FailedObservable v) where
-- observe (FailedObservable ex) callback = do
-- liftResourceManagerIO $ callback $ ObservableNotAvailable ex
--
--
---- TODO implement
----cacheObservable :: IsObservable v o => o -> Observable v
----cacheObservable = undefined