From 5a3a31727de9a2b1817fa105209affb0ca0faa2f Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Sun, 5 Sep 2021 00:00:18 +0200
Subject: [PATCH] Rewrite observeBlocking for new observable signature

Co-authored-by: Jan Beinke <git@janbeinke.com>
---
 src/Quasar/Observable.hs | 49 ++++++++++++----------------------------
 1 file changed, 14 insertions(+), 35 deletions(-)

diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index 2a62540..b87fec5 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -113,41 +113,20 @@ class IsRetrievable v o => IsObservable v o | o -> v where
 -- | Observes an observable by handling updates on the current thread.
 observeBlocking :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a
 observeBlocking observable callback = do
-  msgVar <- liftIO $ newTVarIO ObservableLoading
-  idVar <- liftIO $ newTVarIO (0 :: Word64)
-  calledIdVar <- liftIO $ newTVarIO (0 :: Word64)
-  completedVar <- newAsyncVar
-
-  resourceManager <- askResourceManager
-  finally
-    do
-      bracketOnError
-        do
-          -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design.
-          forkTask do
-            attachDisposable resourceManager =<< liftIO do
-              oldObserve observable \msg -> do
-                currentMsgId <- atomically do
-                  writeTVar msgVar msg
-                  stateTVar idVar (dup . (+ 1))
-                -- Wait for `callback` to complete
-                awaitAny2
-                  do toAwaitable completedVar
-                  do
-                    unsafeAwaitSTM do
-                      readTVar calledIdVar >>= \calledId ->
-                        unless (calledId >= currentMsgId) retry
-        do disposeAndAwait
-        do
-          const $ forever do
-            (msgId, msg) <- liftIO $ atomically do
-              msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar)
-              unless msgAvailable retry
-              liftA2 (,) (readTVar idVar) (readTVar msgVar)
-            callback msg
-            liftIO $ atomically $ writeTVar calledIdVar msgId
-    do
-      putAsyncVar completedVar ()
+  withSubResourceManagerM do
+    var <- liftIO newEmptyTMVarIO
+    observe observable \msg -> do
+      liftIO $ atomically do
+        cbCompletedVar <- tryTakeTMVar var >>= \case
+          Nothing ->  newAsyncVarSTM
+          Just (_, cbCompletedVar) -> pure cbCompletedVar
+        putTMVar var (msg, cbCompletedVar)
+        pure $ toAwaitable cbCompletedVar
+
+    forever do
+      (msg, cbCompletedVar) <- liftIO $ atomically $ takeTMVar var
+      callback msg
+      putAsyncVar_ cbCompletedVar ()
 
 
 asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> m ()) -> m ()
-- 
GitLab