Skip to content
Snippets Groups Projects
Commit 211f1f1c authored by Jens Nolte's avatar Jens Nolte
Browse files

Implement observeWhile to replace unsafe mfix-based observe variant

parent c7b02be6
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,8 @@ module Quasar.Observable (
modifyObservableVar_,
-- * Helper functions
observeWhile,
observeWhile_,
fnObservable,
synchronousFnObservable,
mergeObservable,
......@@ -80,15 +82,39 @@ class IsRetrievable v o => IsObservable v o | o -> v where
mapObservable :: (v -> a) -> o -> Observable a
mapObservable f = Observable . MappedObservable f
-- | A variant of `observe` that passes the `Disposable` to the callback.
--
-- The disposable passed to the callback must not be used before `observeFixed` returns (otherwise an exception is thrown).
observeFixed :: IsObservable v o => o -> (Disposable -> ObservableMessage v -> IO ()) -> IO Disposable
observeFixed observable callback = fixIO $ \disposable -> observe observable (callback disposable)
-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed.
observeWhile :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable
observeWhile observable callback = do
resourceManager <- askResourceManager
disposeVar <- liftIO $ newTVarIO False
type ObservableCallback v = ObservableMessage v -> IO ()
innerDisposable <- liftIO $ observe observable \msg -> do
disposeRequested <- atomically $ readTVar disposeVar
unless disposeRequested do
continue <- callback msg
unless continue $ atomically $ writeTVar disposeVar True
-- Bind the disposable to the ResourceManager, to prevent leaks if the `async` is disposed
disposable <- boundDisposable $ dispose innerDisposable
task <- async do
liftIO $ atomically do
disposeRequested <- readTVar disposeVar
unless disposeRequested retry
liftIO $ dispose disposable
pure (disposable <> (toDisposable task))
-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed.
observeWhile_ :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m ()
observeWhile_ observable callback =
-- The disposable is already attached to the resource manager, so voiding it is safe.
void $ observeWhile observable callback
type ObservableCallback v = ObservableMessage v -> IO ()
-- | Existential quantification wrapper for the IsObservable type class.
data Observable v = forall o. IsObservable v o => Observable o
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment