From 8e9478b434ca7922651dfed8c3905b3090433bf9 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Sun, 13 Mar 2022 16:05:38 +0100
Subject: [PATCH] Implement Monad instance for Observable

---
 src/Quasar/Observable.hs | 93 +++++++++++++++++++---------------------
 1 file changed, 44 insertions(+), 49 deletions(-)

diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index dc47cb0..e156c6b 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -125,9 +125,9 @@ instance Applicative Observable where
   pure = toObservable . ConstObservable
   liftA2 fn x y = toObservable $ LiftA2Observable fn x y
 
---instance Monad Observable where
---  x >>= y = toObservable $ BindObservable x y
---
+instance Monad Observable where
+  x >>= f = toObservable $ BindObservable x f
+
 --instance MonadThrow Observable where
 --  throwM :: forall e v. Exception e => e -> Observable v
 --  throwM = toObservable . FailedObservable @v . toException
@@ -226,7 +226,7 @@ instance IsRetrievable r (MappedObservable r) where
 instance IsObservable r (MappedObservable r) where
   observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn)
   pingObservable (MappedObservable _ observable) = pingObservable observable
-  mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream
+  mapObservable f1 (MappedObservable f2 upstream) = toObservable $ MappedObservable (f1 . f2) upstream
 
 
 -- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
@@ -258,52 +258,47 @@ instance IsObservable r (LiftA2Observable r) where
     pingObservable fx
     await future
 
-  mapObservable f1 (LiftA2Observable f2 fx fy) = Observable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy
+  mapObservable f1 (LiftA2Observable f2 fx fy) = toObservable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy
+
+
+data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
+
+instance IsRetrievable r (BindObservable r) where
+  retrieve (BindObservable fx fn) = do
+    x <- retrieve fx
+    retrieve $ fn x
+
+instance IsObservable r (BindObservable r) where
+  observe (BindObservable fx fn) callback = ensureQuasarSTM do
+    callback ObservableLoading
+    keyVar <- newTVar =<< newUniqueSTM
+    disposableVar <- liftSTM $ newTVar trivialDisposer
+    observe fx (leftCallback keyVar disposableVar)
+    where
+      leftCallback keyVar disposableVar lmsg = do
+        disposeEventually_ =<< readTVar disposableVar
+        key <- newUniqueSTM
+        -- Dispose is not instant, so a key is used to disarm the callback derived from the last (now outdated) value
+        writeTVar keyVar key
+        disposer <- captureResources_
+          case lmsg of
+            ObservableValue x -> observe (fn x) (rightCallback key)
+            ObservableLoading -> callback ObservableLoading
+            ObservableNotAvailable ex -> callback (ObservableNotAvailable ex)
+        writeTVar disposableVar disposer
+        where
+          rightCallback :: Unique -> ObservableCallback r
+          rightCallback callbackKey rmsg = do
+            activeKey <- readTVar keyVar
+            when (callbackKey == activeKey) (callback rmsg)
+
+  pingObservable (BindObservable fx fn) = do
+    x <- retrieve fx
+    pingObservable (fn x)
+
+  mapObservable f (BindObservable fx fn) = toObservable $ BindObservable fx (f <<$>> fn)
+
 
---data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
---
---instance IsRetrievable r (BindObservable r) where
---  retrieve (BindObservable fx fn) = do
---    awaitable <- retrieve fx
---    value <- liftIO $ await awaitable
---    retrieve $ fn value
---
---instance IsObservable r (BindObservable r) where
---  observe (BindObservable fx fn) callback = do
---    disposableVar <- liftIO $ newTMVarIO noDisposable
---    keyVar <- liftIO $ newTMVarIO =<< newUnique
---
---    observe fx (leftCallback disposableVar keyVar)
---    where
---      leftCallback disposableVar keyVar message = do
---        key <- liftIO newUnique
---
---        oldDisposable <- liftIO $ atomically do
---          -- Blocks while `rightCallback` is running
---          void $ swapTMVar keyVar key
---
---          takeTMVar disposableVar
---
---        disposeEventually_ oldDisposable
---
---        disposable <- case message of
---          (ObservableValue x) -> captureDisposable_ $ observe (fn x) (rightCallback keyVar key)
---          ObservableLoading -> noDisposable <$ callback ObservableLoading
---          (ObservableNotAvailable ex) -> noDisposable <$ callback (ObservableNotAvailable ex)
---
---        liftIO $ atomically $ putTMVar disposableVar disposable
---
---      rightCallback :: TMVar Unique -> Unique -> ObservableState r -> ResourceManagerIO ()
---      rightCallback keyVar key message =
---        bracket
---          -- Take key var to prevent parallel callbacks
---          (liftIO $ atomically $ takeTMVar keyVar)
---          -- Put key back
---          (liftIO . atomically . putTMVar keyVar)
---          -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed
---          (\currentKey -> when (key == currentKey) $ callback message)
---
---
 --data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r)
 --
 --instance IsRetrievable r (CatchObservable e r) where
-- 
GitLab