From 9f667c7989c1866629816105267206c7d5cc4381 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Sun, 13 Mar 2022 22:48:24 +0100
Subject: [PATCH] Add blocking `observe` helper functions

---
 src/Quasar/Observable.hs | 129 ++++++++++++++++++++-------------------
 1 file changed, 65 insertions(+), 64 deletions(-)

diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index 9a2e34a..7f84592 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -15,9 +15,9 @@ module Quasar.Observable (
   stateObservableVar,
 
   ---- * Helper functions
-  --observeWhile,
-  --observeWhile_,
-  --observeBlocking,
+  observeBlocking,
+  observeUntil,
+  observeUntil_,
 
   -- * Helper types
   ObservableCallback,
@@ -28,6 +28,7 @@ import Control.Monad.Catch
 import Control.Monad.Except
 import Control.Monad.Trans.Maybe
 import Data.HashMap.Strict qualified as HM
+import Data.IORef
 import Data.Unique
 import Quasar.Async
 import Quasar.Future
@@ -144,69 +145,69 @@ instance MonadPlus Observable
 
 
 
----- | Observe an observable by handling updates on the current thread.
-----
----- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered).
-----
----- The handler is allowed to block. When the value changes while the handler is running the handler will be run again
----- after it completes; when the value changes multiple times it will only be executed once (with the latest value).
---observeBlocking
---  :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
---  => o
---  -> (ObservableState v -> m ())
---  -> m a
---observeBlocking observable handler = do
---  -- `withScopedResourceManager` removes the `observe` callback when the `handler` fails.
---  withScopedResourceManager do
---    var <- liftIO newEmptyTMVarIO
---    observe observable \msg -> liftIO $ atomically do
---      void $ tryTakeTMVar var
---      putTMVar var msg
---
---    forever do
---      msg <- liftIO $ atomically $ takeTMVar var
---      handler msg
---
---
----- | Internal control flow exception for `observeWhile` and `observeWhile_`.
---data ObserveWhileCompleted = ObserveWhileCompleted
---  deriving stock (Eq, Show)
---
---instance Exception ObserveWhileCompleted
---
----- | Observe until the callback returns `Just`.
---observeWhile
---  :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
---  => o
---  -> (ObservableState v -> m (Maybe a))
---  -> m a
---observeWhile observable callback = do
---  resultVar <- liftIO $ newIORef unreachableCodePath
---  observeWhile_ observable \msg -> do
---    callback msg >>= \case
---      Just result -> do
---        liftIO $ writeIORef resultVar result
---        pure False
---      Nothing -> pure True
---
---  liftIO $ readIORef resultVar
---
---
----- | Observe until the callback returns `False`.
---observeWhile_
---  :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m)
---  => o
---  -> (ObservableState v -> m Bool)
---  -> m ()
---observeWhile_ observable callback =
---  catch
---    do
---      observeBlocking observable \msg -> do
---        continue <- callback msg
---        unless continue $ throwM ObserveWhileCompleted
---    \ObserveWhileCompleted -> pure ()
+-- | Observe an observable by handling updates on the current thread.
 --
+-- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered).
 --
+-- The handler is allowed to block. When the value changes while the handler is running the handler will be run again
+-- after it completes; when the value changes multiple times it will only be executed once (with the latest value).
+observeBlocking
+  :: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m)
+  => a
+  -> (ObservableState r -> m ())
+  -> m b
+observeBlocking observable handler = do
+  -- `withResourceScope` removes the `observe` callback when the `handler` fails.
+  -- TODO this also releases all resources when the handler fails - is that correct? if so it should be documented
+  withResourceScope do
+    var <- liftIO newEmptyTMVarIO
+
+    observe observable \msg -> liftSTM do
+      void $ tryTakeTMVar var
+      putTMVar var msg
+
+    forever do
+      msg <- liftIO $ atomically $ takeTMVar var
+      handler msg
+
+
+-- | Internal control flow exception for `observeWhile` and `observeWhile_`.
+data ObserveWhileCompleted = ObserveWhileCompleted
+  deriving stock (Eq, Show)
+
+instance Exception ObserveWhileCompleted
+
+-- | Observe until the callback returns `Just`.
+observeUntil
+  :: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m)
+  => a
+  -> (ObservableState r -> m (Maybe b))
+  -> m b
+observeUntil observable callback = do
+  resultVar <- liftIO $ newIORef unreachableCodePath
+  observeUntil_ observable \msg -> do
+    callback msg >>= \case
+      Just result -> do
+        liftIO $ writeIORef resultVar result
+        pure False
+      Nothing -> pure True
+
+  liftIO $ readIORef resultVar
+
+
+-- | Observe until the callback returns `False`.
+observeUntil_
+  :: (IsObservable r a, MonadQuasar m, MonadIO m, MonadMask m)
+  => a
+  -> (ObservableState r -> m Bool)
+  -> m ()
+observeUntil_ observable callback =
+  catch
+    do
+      observeBlocking observable \msg -> do
+        continue <- callback msg
+        unless continue $ throwM ObserveWhileCompleted
+    \ObserveWhileCompleted -> pure ()
 
 
 newtype ConstObservable a = ConstObservable a
-- 
GitLab