Skip to content
Snippets Groups Projects
Commit 70c96bb6 authored by Jens Nolte's avatar Jens Nolte
Browse files

Reimplement observeWhile


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 0ea39eca
No related branches found
No related tags found
No related merge requests found
...@@ -38,6 +38,7 @@ import Control.Monad.Catch ...@@ -38,6 +38,7 @@ import Control.Monad.Catch
import Control.Monad.Except import Control.Monad.Except
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.HashMap.Strict qualified as HM import Data.HashMap.Strict qualified as HM
import Data.IORef
import Data.Unique import Data.Unique
import Quasar.Async import Quasar.Async
import Quasar.Awaitable import Quasar.Awaitable
...@@ -117,36 +118,34 @@ asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> ...@@ -117,36 +118,34 @@ asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v ->
asyncObserve observable callback = toDisposable <$> async (observe observable callback) asyncObserve observable callback = toDisposable <$> async (observe observable callback)
-- | (TODO) Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. data ObserveWhileCompleted = ObserveWhileCompleted
observeWhile :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable deriving (Eq, Show)
observeWhile observable callback = do
--disposeVar <- liftIO $ newTVarIO False
--innerDisposable <- liftIO $ observe observable \msg -> do
-- disposeRequested <- readTVarIO disposeVar
-- unless disposeRequested do
-- continue <- callback msg
-- unless continue $ atomically $ writeTVar disposeVar True
---- Bind the disposable to the ResourceManager, to prevent leaks if the `async` is disposed instance Exception ObserveWhileCompleted
--disposable <- boundDisposable $ dispose innerDisposable
--task <- async do -- | Observe until the callback returns `Just`.
-- liftIO $ atomically do observeWhile :: (IsObservable v o, MonadAwait m, MonadResourceManager m) => o -> (ObservableMessage v -> m (Maybe a)) -> m a
-- disposeRequested <- readTVar disposeVar observeWhile observable callback = do
-- unless disposeRequested retry resultVar <- liftIO $ newIORef impossibleCodePath
-- liftIO $ dispose disposable observeWhile_ observable \msg -> do
callback msg >>= \case
--pure (disposable <> (toDisposable task)) Just result -> do
liftIO $ writeIORef resultVar result
pure False
Nothing -> pure True
undefined -- TODO reimplement after ResouceManager API is changed liftIO $ readIORef resultVar
-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. -- | Observe until the callback returns `False`.
observeWhile_ :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m () observeWhile_ :: (IsObservable v o, MonadAwait m, MonadResourceManager m) => o -> (ObservableMessage v -> m Bool) -> m ()
observeWhile_ observable callback = observeWhile_ observable callback =
-- The disposable is already attached to the resource manager, so voiding it is safe. catch
void $ observeWhile observable callback do
observe observable \msg -> do
continue <- callback msg
unless continue $ throwM ObserveWhileCompleted
\ObserveWhileCompleted -> pure ()
type ObservableCallback v = ObservableMessage v -> IO () type ObservableCallback v = ObservableMessage v -> IO ()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment