Skip to content
Snippets Groups Projects
Commit 9f667c79 authored by Jens Nolte's avatar Jens Nolte
Browse files

Add blocking `observe` helper functions

parent 5ea0f706
No related branches found
No related tags found
No related merge requests found
Pipeline #2731 passed
......@@ -15,9 +15,9 @@ module Quasar.Observable (
stateObservableVar,
---- * Helper functions
--observeWhile,
--observeWhile_,
--observeBlocking,
observeBlocking,
observeUntil,
observeUntil_,
-- * Helper types
ObservableCallback,
......@@ -28,6 +28,7 @@ import Control.Monad.Catch
import Control.Monad.Except
import Control.Monad.Trans.Maybe
import Data.HashMap.Strict qualified as HM
import Data.IORef
import Data.Unique
import Quasar.Async
import Quasar.Future
......@@ -144,69 +145,69 @@ instance MonadPlus Observable
---- | Observe an observable by handling updates on the current thread.
----
---- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered).
----
---- The handler is allowed to block. When the value changes while the handler is running the handler will be run again
---- after it completes; when the value changes multiple times it will only be executed once (with the latest value).
--observeBlocking
-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
-- => o
-- -> (ObservableState v -> m ())
-- -> m a
--observeBlocking observable handler = do
-- -- `withScopedResourceManager` removes the `observe` callback when the `handler` fails.
-- withScopedResourceManager do
-- var <- liftIO newEmptyTMVarIO
-- observe observable \msg -> liftIO $ atomically do
-- void $ tryTakeTMVar var
-- putTMVar var msg
--
-- forever do
-- msg <- liftIO $ atomically $ takeTMVar var
-- handler msg
--
--
---- | Internal control flow exception for `observeWhile` and `observeWhile_`.
--data ObserveWhileCompleted = ObserveWhileCompleted
-- deriving stock (Eq, Show)
--
--instance Exception ObserveWhileCompleted
--
---- | Observe until the callback returns `Just`.
--observeWhile
-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
-- => o
-- -> (ObservableState v -> m (Maybe a))
-- -> m a
--observeWhile observable callback = do
-- resultVar <- liftIO $ newIORef unreachableCodePath
-- observeWhile_ observable \msg -> do
-- callback msg >>= \case
-- Just result -> do
-- liftIO $ writeIORef resultVar result
-- pure False
-- Nothing -> pure True
--
-- liftIO $ readIORef resultVar
--
--
---- | Observe until the callback returns `False`.
--observeWhile_
-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
-- => o
-- -> (ObservableState v -> m Bool)
-- -> m ()
--observeWhile_ observable callback =
-- catch
-- do
-- observeBlocking observable \msg -> do
-- continue <- callback msg
-- unless continue $ throwM ObserveWhileCompleted
-- \ObserveWhileCompleted -> pure ()
-- | Observe an observable by handling updates on the current thread.
--
-- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered).
--
-- The handler is allowed to block. When the value changes while the handler is running the handler will be run again
-- after it completes; when the value changes multiple times it will only be executed once (with the latest value).
observeBlocking
:: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m)
=> a
-> (ObservableState r -> m ())
-> m b
observeBlocking observable handler = do
-- `withResourceScope` removes the `observe` callback when the `handler` fails.
-- TODO this also releases all resources when the handler fails - is that correct? if so it should be documented
withResourceScope do
var <- liftIO newEmptyTMVarIO
observe observable \msg -> liftSTM do
void $ tryTakeTMVar var
putTMVar var msg
forever do
msg <- liftIO $ atomically $ takeTMVar var
handler msg
-- | Internal control flow exception for `observeWhile` and `observeWhile_`.
data ObserveWhileCompleted = ObserveWhileCompleted
deriving stock (Eq, Show)
instance Exception ObserveWhileCompleted
-- | Observe until the callback returns `Just`.
observeUntil
:: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m)
=> a
-> (ObservableState r -> m (Maybe b))
-> m b
observeUntil observable callback = do
resultVar <- liftIO $ newIORef unreachableCodePath
observeUntil_ observable \msg -> do
callback msg >>= \case
Just result -> do
liftIO $ writeIORef resultVar result
pure False
Nothing -> pure True
liftIO $ readIORef resultVar
-- | Observe until the callback returns `False`.
observeUntil_
:: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m)
=> a
-> (ObservableState r -> m Bool)
-> m ()
observeUntil_ observable callback =
catch
do
observeBlocking observable \msg -> do
continue <- callback msg
unless continue $ throwM ObserveWhileCompleted
\ObserveWhileCompleted -> pure ()
newtype ConstObservable a = ConstObservable a
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment