diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 4bebb340f94968c2dffcde343019ef9cd3d96d7b..941f45bcc23965c27a13786cec2e8ba122019e85 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -38,6 +38,7 @@ import Control.Monad.Catch import Control.Monad.Except import Control.Monad.Trans.Maybe import Data.HashMap.Strict qualified as HM +import Data.IORef import Data.Unique import Quasar.Async import Quasar.Awaitable @@ -117,36 +118,34 @@ asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> asyncObserve observable callback = toDisposable <$> async (observe observable callback) --- | (TODO) Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. -observeWhile :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable -observeWhile observable callback = do - --disposeVar <- liftIO $ newTVarIO False - - --innerDisposable <- liftIO $ observe observable \msg -> do - -- disposeRequested <- readTVarIO disposeVar - -- unless disposeRequested do - -- continue <- callback msg - -- unless continue $ atomically $ writeTVar disposeVar True +data ObserveWhileCompleted = ObserveWhileCompleted + deriving (Eq, Show) - ---- Bind the disposable to the ResourceManager, to prevent leaks if the `async` is disposed - --disposable <- boundDisposable $ dispose innerDisposable +instance Exception ObserveWhileCompleted - --task <- async do - -- liftIO $ atomically do - -- disposeRequested <- readTVar disposeVar - -- unless disposeRequested retry - -- liftIO $ dispose disposable - - --pure (disposable <> (toDisposable task)) +-- | Observe until the callback returns `Just`. +observeWhile :: (IsObservable v o, MonadAwait m, MonadResourceManager m) => o -> (ObservableMessage v -> m (Maybe a)) -> m a +observeWhile observable callback = do + resultVar <- liftIO $ newIORef impossibleCodePath + observeWhile_ observable \msg -> do + callback msg >>= \case + Just result -> do + liftIO $ writeIORef resultVar result + pure False + Nothing -> pure True - undefined -- TODO reimplement after ResouceManager API is changed + liftIO $ readIORef resultVar --- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. -observeWhile_ :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m () +-- | Observe until the callback returns `False`. +observeWhile_ :: (IsObservable v o, MonadAwait m, MonadResourceManager m) => o -> (ObservableMessage v -> m Bool) -> m () observeWhile_ observable callback = - -- The disposable is already attached to the resource manager, so voiding it is safe. - void $ observeWhile observable callback + catch + do + observe observable \msg -> do + continue <- callback msg + unless continue $ throwM ObserveWhileCompleted + \ObserveWhileCompleted -> pure () type ObservableCallback v = ObservableMessage v -> IO ()