From 8595d1a4710d92f22b34615b33e996be01aa7353 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Wed, 1 Sep 2021 22:25:16 +0200
Subject: [PATCH] Fix observe compatability wrapper deadlock

---
 src/Quasar/Observable.hs | 48 +++++++++++++++++++++++-----------------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index 7cbb126..e8faa5d 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -86,30 +86,38 @@ class IsRetrievable v o => IsObservable v o | o -> v where
     msgVar <- liftIO $ newTVarIO ObservableLoading
     idVar <- liftIO $ newTVarIO (0 :: Word64)
     calledIdVar <- liftIO $ newTVarIO (0 :: Word64)
+    completedVar <- newAsyncVar
 
     resourceManager <- askResourceManager
-    bracketOnError
+    finally
       do
-        -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design.
-        forkTask do
-          attachDisposable resourceManager =<< liftIO do
-            unsafeAsyncObserveIO observable \msg -> do
-              currentMsgId <- atomically do
-                writeTVar msgVar msg
-                stateTVar idVar (dup . (+ 1))
-              -- Wait for `callback` to complete
-              atomically do
-                readTVar calledIdVar >>= \calledId ->
-                  unless (calledId >= currentMsgId) retry
-      do disposeAndAwait
+        bracketOnError
+          do
+            -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design.
+            forkTask do
+              attachDisposable resourceManager =<< liftIO do
+                unsafeAsyncObserveIO observable \msg -> do
+                  currentMsgId <- atomically do
+                    writeTVar msgVar msg
+                    stateTVar idVar (dup . (+ 1))
+                  -- Wait for `callback` to complete
+                  awaitAny2
+                    do toAwaitable completedVar
+                    do
+                      unsafeAwaitSTM do
+                        readTVar calledIdVar >>= \calledId ->
+                          unless (calledId >= currentMsgId) retry
+          do disposeAndAwait
+          do
+            const $ forever do
+              (msgId, msg) <- liftIO $ atomically do
+                msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar)
+                unless msgAvailable retry
+                liftA2 (,) (readTVar idVar) (readTVar msgVar)
+              callback msg
+              liftIO $ atomically $ writeTVar calledIdVar msgId
       do
-        const $ forever do
-          (msgId, msg) <- liftIO $ atomically do
-            msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar)
-            unless msgAvailable retry
-            liftA2 (,) (readTVar idVar) (readTVar msgVar)
-          callback msg
-          liftIO $ atomically $ writeTVar calledIdVar msgId
+        putAsyncVar completedVar ()
 
   unsafeAsyncObserveIO :: o -> (ObservableMessage v -> IO ()) -> IO Disposable
   unsafeAsyncObserveIO observable callback = do
-- 
GitLab