diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 9a2e34a253db207a229235eb0b9db2b920689fb2..7f845928c5e841c8b750853b5c2787a61f18aa22 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -15,9 +15,9 @@ module Quasar.Observable ( stateObservableVar, ---- * Helper functions - --observeWhile, - --observeWhile_, - --observeBlocking, + observeBlocking, + observeUntil, + observeUntil_, -- * Helper types ObservableCallback, @@ -28,6 +28,7 @@ import Control.Monad.Catch import Control.Monad.Except import Control.Monad.Trans.Maybe import Data.HashMap.Strict qualified as HM +import Data.IORef import Data.Unique import Quasar.Async import Quasar.Future @@ -144,69 +145,69 @@ instance MonadPlus Observable ----- | Observe an observable by handling updates on the current thread. ----- ----- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered). ----- ----- The handler is allowed to block. When the value changes while the handler is running the handler will be run again ----- after it completes; when the value changes multiple times it will only be executed once (with the latest value). ---observeBlocking --- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) --- => o --- -> (ObservableState v -> m ()) --- -> m a ---observeBlocking observable handler = do --- -- `withScopedResourceManager` removes the `observe` callback when the `handler` fails. --- withScopedResourceManager do --- var <- liftIO newEmptyTMVarIO --- observe observable \msg -> liftIO $ atomically do --- void $ tryTakeTMVar var --- putTMVar var msg --- --- forever do --- msg <- liftIO $ atomically $ takeTMVar var --- handler msg --- --- ----- | Internal control flow exception for `observeWhile` and `observeWhile_`. ---data ObserveWhileCompleted = ObserveWhileCompleted --- deriving stock (Eq, Show) --- ---instance Exception ObserveWhileCompleted --- ----- | Observe until the callback returns `Just`. ---observeWhile --- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) --- => o --- -> (ObservableState v -> m (Maybe a)) --- -> m a ---observeWhile observable callback = do --- resultVar <- liftIO $ newIORef unreachableCodePath --- observeWhile_ observable \msg -> do --- callback msg >>= \case --- Just result -> do --- liftIO $ writeIORef resultVar result --- pure False --- Nothing -> pure True --- --- liftIO $ readIORef resultVar --- --- ----- | Observe until the callback returns `False`. ---observeWhile_ --- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) --- => o --- -> (ObservableState v -> m Bool) --- -> m () ---observeWhile_ observable callback = --- catch --- do --- observeBlocking observable \msg -> do --- continue <- callback msg --- unless continue $ throwM ObserveWhileCompleted --- \ObserveWhileCompleted -> pure () +-- | Observe an observable by handling updates on the current thread. -- +-- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered). -- +-- The handler is allowed to block. When the value changes while the handler is running the handler will be run again +-- after it completes; when the value changes multiple times it will only be executed once (with the latest value). +observeBlocking + :: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m) + => a + -> (ObservableState r -> m ()) + -> m b +observeBlocking observable handler = do + -- `withResourceScope` removes the `observe` callback when the `handler` fails. + -- TODO this also releases all resources when the handler fails - is that correct? if so it should be documented + withResourceScope do + var <- liftIO newEmptyTMVarIO + + observe observable \msg -> liftSTM do + void $ tryTakeTMVar var + putTMVar var msg + + forever do + msg <- liftIO $ atomically $ takeTMVar var + handler msg + + +-- | Internal control flow exception for `observeWhile` and `observeWhile_`. +data ObserveWhileCompleted = ObserveWhileCompleted + deriving stock (Eq, Show) + +instance Exception ObserveWhileCompleted + +-- | Observe until the callback returns `Just`. +observeUntil + :: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m) + => a + -> (ObservableState r -> m (Maybe b)) + -> m b +observeUntil observable callback = do + resultVar <- liftIO $ newIORef unreachableCodePath + observeUntil_ observable \msg -> do + callback msg >>= \case + Just result -> do + liftIO $ writeIORef resultVar result + pure False + Nothing -> pure True + + liftIO $ readIORef resultVar + + +-- | Observe until the callback returns `False`. +observeUntil_ + :: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m) + => a + -> (ObservableState r -> m Bool) + -> m () +observeUntil_ observable callback = + catch + do + observeBlocking observable \msg -> do + continue <- callback msg + unless continue $ throwM ObserveWhileCompleted + \ObserveWhileCompleted -> pure () newtype ConstObservable a = ConstObservable a