From 2031ee9b9f31652821e132813057b039b8b8d624 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Mon, 9 Aug 2021 19:20:48 +0200
Subject: [PATCH] Offer both a synchronous and an asynchronous variant of
 FnObservable

---
 src/Quasar/Observable.hs                   | 93 ++++++++++++++++------
 src/Quasar/Observable/ObservableHashMap.hs |  2 +-
 2 files changed, 69 insertions(+), 26 deletions(-)

diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index dbf3902..63cf838 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -17,6 +17,8 @@ module Quasar.Observable (
   modifyObservableVar_,
 
   -- * Helper functions
+  fnObservable,
+  synchronousFnObservable,
   mergeObservable,
   mergeObservableMaybe,
   joinObservable,
@@ -27,13 +29,13 @@ module Quasar.Observable (
   bindObservable,
 
   -- * Helper types
-  FnObservable(..),
   ObservableCallback,
 ) where
 
 import Control.Concurrent.MVar
 import Control.Concurrent.STM
 import Control.Monad.Except
+import Control.Monad.Reader
 import Control.Monad.Trans.Maybe
 import Data.HashMap.Strict qualified as HM
 import Data.IORef
@@ -176,6 +178,17 @@ bindObservable :: (IsObservable a ma, IsObservable b mb) => ma -> (a -> mb) -> O
 bindObservable x fy = joinObservable $ mapObservable fy x
 
 
+-- | Internal state of `JoinedObservable`
+data JoinedObservableState
+  = JoinedObservableInactive
+  | JoinedObservableActive Unique Disposable
+  | JoinedObservableDisposed
+
+instance IsDisposable JoinedObservableState where
+  dispose JoinedObservableInactive = pure $ successfulAwaitable ()
+  dispose (JoinedObservableActive _ disposable) = dispose disposable
+  dispose JoinedObservableDisposed = pure $ successfulAwaitable ()
+
 newtype JoinedObservable o = JoinedObservable o
 instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v (JoinedObservable o) where
   retrieve :: HasResourceManager m => JoinedObservable o -> m (AsyncTask v)
@@ -183,27 +196,41 @@ instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v
 instance forall v o i. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where
   observe :: JoinedObservable o -> (ObservableMessage v -> IO ()) -> IO Disposable
   observe (JoinedObservable outer) callback = do
-    -- TODO: rewrite with latest semantics
-    -- the current implementation blocks the callback while `dispose` is running
-    innerDisposableMVar <- newMVar Nothing
-    outerDisposable <- observe outer (outerCallback innerDisposableMVar)
-    pure $ mkDisposable $ do
-      -- TODO use `disposeEventually` to immediately deregister handler (ignoring messages from the old callback after that)
-      undefined
-      --dispose outerDisposable
-      --mapM_ dispose =<< liftIO (readMVar innerDisposableMVar)
+    -- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
+    resourceManager <- newResourceManager unlimitedResourceManagerConfiguration
+
+    stateMVar <- newMVar JoinedObservableInactive
+    outerDisposable <- observe outer (outerCallback resourceManager stateMVar)
+
+    attachDisposeAction_ resourceManager $ do
+      d1 <- dispose outerDisposable
+      d2 <- modifyMVar stateMVar $ \state -> do
+        d2temp <- dispose state
+        pure (JoinedObservableDisposed, d2temp)
+      pure $ d1 <> d2
+
+    pure $ toDisposable resourceManager
       where
-        outerCallback :: MVar (Maybe Disposable) -> ObservableMessage i -> IO ()
-        outerCallback innerDisposableMVar message = do
-          oldInnerDisposable <- takeMVar innerDisposableMVar
-          mapM_ disposeIO oldInnerDisposable
-          newInnerDisposable <- outerCallbackObserve message
-          putMVar innerDisposableMVar newInnerDisposable
-        outerCallbackObserve :: ObservableMessage i -> IO (Maybe Disposable)
-        outerCallbackObserve (ObservableUpdate innerObservable) = Just <$> observe innerObservable callback
-        outerCallbackObserve ObservableConnecting = Nothing <$ callback ObservableConnecting
-        outerCallbackObserve (ObservableReconnecting ex) = Nothing <$ callback (ObservableReconnecting ex)
-        outerCallbackObserve (ObservableNotAvailable ex) = Nothing <$ callback (ObservableNotAvailable ex)
+        outerCallback :: ResourceManager -> MVar JoinedObservableState -> ObservableMessage i -> IO ()
+        outerCallback resourceManager stateMVar message = do
+          oldState <- takeMVar stateMVar
+          disposeEventually resourceManager oldState
+          key <- newUnique
+          newState <- outerCallbackObserve key message
+          putMVar stateMVar newState
+          where
+            outerCallbackObserve :: Unique -> ObservableMessage i -> IO JoinedObservableState
+            outerCallbackObserve key (ObservableUpdate innerObservable) = JoinedObservableActive key <$> observe innerObservable (filteredCallback key)
+            outerCallbackObserve _ ObservableConnecting = JoinedObservableInactive <$ callback ObservableConnecting
+            outerCallbackObserve _ (ObservableReconnecting ex) = JoinedObservableInactive <$ callback (ObservableReconnecting ex)
+            outerCallbackObserve _ (ObservableNotAvailable ex) = JoinedObservableInactive <$ callback (ObservableNotAvailable ex)
+            filteredCallback :: Unique -> ObservableMessage v -> IO ()
+            filteredCallback key msg =
+              -- TODO write a version that does not deadlock when `observe` calls the callback directly
+              undefined
+              withMVar stateMVar $ \case
+                JoinedObservableActive activeKey _ -> callback msg
+                _ -> pure ()
 
 joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v
 joinObservable = Observable . JoinedObservable
@@ -255,20 +282,36 @@ mergeObservableMaybe :: (IsObservable (Maybe v0) o0, IsObservable (Maybe v1) o1)
 mergeObservableMaybe merge x y = Observable $ MergedObservable (liftA2 merge) x y
 
 
--- | Data type that can be used as an implementation for the `IsObservable` interface that works by directly providing functions for `retrieve` and `subscribe`.
 data FnObservable v = FnObservable {
-  retrieveFn :: IO v,
+  retrieveFn :: forall m. HasResourceManager m => m (AsyncTask v),
   observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable
 }
 instance IsRetrievable v (FnObservable v) where
-  retrieve o = liftIO $ successfulTask <$> retrieveFn o
+  retrieve o = retrieveFn o
 instance IsObservable v (FnObservable v) where
   observe o = observeFn o
   mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable {
-    retrieveFn = f <$> retrieveFn,
+    retrieveFn = f <<$>> retrieveFn,
     observeFn = \listener -> observeFn (listener . fmap f)
   }
 
+-- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`.
+fnObservable
+  :: ((ObservableMessage v -> IO ()) -> IO Disposable)
+  -> (forall m. HasResourceManager m => m (AsyncTask v))
+  -> Observable v
+fnObservable observeFn retrieveFn = toObservable FnObservable{observeFn, retrieveFn}
+
+-- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`.
+synchronousFnObservable
+  :: forall v. ((ObservableMessage v -> IO ()) -> IO Disposable)
+  -> IO v
+  -> Observable v
+synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn retrieveFn
+  where
+    retrieveFn :: (forall m. HasResourceManager m => m (AsyncTask v))
+    retrieveFn = liftIO $ successfulTask <$> synchronousRetrieveFn
+
 
 newtype ConstObservable v = ConstObservable v
 instance IsRetrievable v (ConstObservable v) where
diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs
index 3b12db2..c183ffc 100644
--- a/src/Quasar/Observable/ObservableHashMap.hs
+++ b/src/Quasar/Observable/ObservableHashMap.hs
@@ -115,7 +115,7 @@ new :: IO (ObservableHashMap k v)
 new = ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty}
 
 observeKey :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> Observable (Maybe v)
-observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{retrieveFn, observeFn}
+observeKey key ohm@(ObservableHashMap mvar) = synchronousFnObservable observeFn retrieveFn
   where
     retrieveFn :: IO (Maybe v)
     retrieveFn = liftIO $ join . preview (_keyHandles . at key . _Just . _value) <$> readMVar mvar
-- 
GitLab