diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 2cb17dee3a39221133ea944848115937feeb5edc..60bc12ab8e65a40e1334aaaa2784577cb4210a53 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -17,6 +17,8 @@ module Quasar.Observable ( modifyObservableVar_, -- * Helper functions + observeWhile, + observeWhile_, fnObservable, synchronousFnObservable, mergeObservable, @@ -80,15 +82,39 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f --- | A variant of `observe` that passes the `Disposable` to the callback. --- --- The disposable passed to the callback must not be used before `observeFixed` returns (otherwise an exception is thrown). -observeFixed :: IsObservable v o => o -> (Disposable -> ObservableMessage v -> IO ()) -> IO Disposable -observeFixed observable callback = fixIO $ \disposable -> observe observable (callback disposable) +-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. +observeWhile :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable +observeWhile observable callback = do + resourceManager <- askResourceManager + disposeVar <- liftIO $ newTVarIO False -type ObservableCallback v = ObservableMessage v -> IO () + innerDisposable <- liftIO $ observe observable \msg -> do + disposeRequested <- atomically $ readTVar disposeVar + unless disposeRequested do + continue <- callback msg + unless continue $ atomically $ writeTVar disposeVar True + + -- Bind the disposable to the ResourceManager, to prevent leaks if the `async` is disposed + disposable <- boundDisposable $ dispose innerDisposable + + task <- async do + liftIO $ atomically do + disposeRequested <- readTVar disposeVar + unless disposeRequested retry + liftIO $ dispose disposable + + pure (disposable <> (toDisposable task)) +-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. +observeWhile_ :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m () +observeWhile_ observable callback = + -- The disposable is already attached to the resource manager, so voiding it is safe. + void $ observeWhile observable callback + + +type ObservableCallback v = ObservableMessage v -> IO () + -- | Existential quantification wrapper for the IsObservable type class. data Observable v = forall o. IsObservable v o => Observable o