From 211f1f1c9c8a478408e892c6ef4a3a716b73506f Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Thu, 12 Aug 2021 17:56:23 +0200
Subject: [PATCH] Implement observeWhile to replace unsafe mfix-based observe
 variant

---
 src/Quasar/Observable.hs | 38 ++++++++++++++++++++++++++++++++------
 1 file changed, 32 insertions(+), 6 deletions(-)

diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index 2cb17de..60bc12a 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -17,6 +17,8 @@ module Quasar.Observable (
   modifyObservableVar_,
 
   -- * Helper functions
+  observeWhile,
+  observeWhile_,
   fnObservable,
   synchronousFnObservable,
   mergeObservable,
@@ -80,15 +82,39 @@ class IsRetrievable v o => IsObservable v o | o -> v where
   mapObservable :: (v -> a) -> o -> Observable a
   mapObservable f = Observable . MappedObservable f
 
--- | A variant of `observe` that passes the `Disposable` to the callback.
---
--- The disposable passed to the callback must not be used before `observeFixed` returns (otherwise an exception is thrown).
-observeFixed :: IsObservable v o => o -> (Disposable -> ObservableMessage v -> IO ()) -> IO Disposable
-observeFixed observable callback = fixIO $ \disposable -> observe observable (callback disposable)
+-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed.
+observeWhile :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable
+observeWhile observable callback = do
+  resourceManager <- askResourceManager
+  disposeVar <- liftIO $ newTVarIO False
 
-type ObservableCallback v = ObservableMessage v -> IO ()
+  innerDisposable <- liftIO $ observe observable \msg -> do
+    disposeRequested <- atomically $ readTVar disposeVar
+    unless disposeRequested do
+      continue <- callback msg
+      unless continue $ atomically $ writeTVar disposeVar True
+
+  -- Bind the disposable to the ResourceManager, to prevent leaks if the `async` is disposed
+  disposable <- boundDisposable $ dispose innerDisposable
+
+  task <- async do
+    liftIO $ atomically do
+      disposeRequested <- readTVar disposeVar
+      unless disposeRequested retry
+    liftIO $ dispose disposable
+
+  pure (disposable <> (toDisposable task))
 
 
+-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed.
+observeWhile_ :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m ()
+observeWhile_ observable callback =
+  -- The disposable is already attached to the resource manager, so voiding it is safe.
+  void $ observeWhile observable callback
+
+
+type ObservableCallback v = ObservableMessage v -> IO ()
+
 
 -- | Existential quantification wrapper for the IsObservable type class.
 data Observable v = forall o. IsObservable v o => Observable o
-- 
GitLab