diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index dc47cb0bfdd0012898b5e4fe430160e751a99fc1..e156c6bd96f227e44799f12f302dc1aac8f83c4b 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -125,9 +125,9 @@ instance Applicative Observable where pure = toObservable . ConstObservable liftA2 fn x y = toObservable $ LiftA2Observable fn x y ---instance Monad Observable where --- x >>= y = toObservable $ BindObservable x y --- +instance Monad Observable where + x >>= f = toObservable $ BindObservable x f + --instance MonadThrow Observable where -- throwM :: forall e v. Exception e => e -> Observable v -- throwM = toObservable . FailedObservable @v . toException @@ -226,7 +226,7 @@ instance IsRetrievable r (MappedObservable r) where instance IsObservable r (MappedObservable r) where observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn) pingObservable (MappedObservable _ observable) = pingObservable observable - mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream + mapObservable f1 (MappedObservable f2 upstream) = toObservable $ MappedObservable (f1 . f2) upstream -- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting @@ -258,52 +258,47 @@ instance IsObservable r (LiftA2Observable r) where pingObservable fx await future - mapObservable f1 (LiftA2Observable f2 fx fy) = Observable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy + mapObservable f1 (LiftA2Observable f2 fx fy) = toObservable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy + + +data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r) + +instance IsRetrievable r (BindObservable r) where + retrieve (BindObservable fx fn) = do + x <- retrieve fx + retrieve $ fn x + +instance IsObservable r (BindObservable r) where + observe (BindObservable fx fn) callback = ensureQuasarSTM do + callback ObservableLoading + keyVar <- newTVar =<< newUniqueSTM + disposableVar <- liftSTM $ newTVar trivialDisposer + observe fx (leftCallback keyVar disposableVar) + where + leftCallback keyVar disposableVar lmsg = do + disposeEventually_ =<< readTVar disposableVar + key <- newUniqueSTM + -- Dispose is not instant, so a key is used to disarm the callback derived from the last (now outdated) value + writeTVar keyVar key + disposer <- captureResources_ + case lmsg of + ObservableValue x -> observe (fn x) (rightCallback key) + ObservableLoading -> callback ObservableLoading + ObservableNotAvailable ex -> callback (ObservableNotAvailable ex) + writeTVar disposableVar disposer + where + rightCallback :: Unique -> ObservableCallback r + rightCallback callbackKey rmsg = do + activeKey <- readTVar keyVar + when (callbackKey == activeKey) (callback rmsg) + + pingObservable (BindObservable fx fn) = do + x <- retrieve fx + pingObservable (fn x) + + mapObservable f (BindObservable fx fn) = toObservable $ BindObservable fx (f <<$>> fn) + ---data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r) --- ---instance IsRetrievable r (BindObservable r) where --- retrieve (BindObservable fx fn) = do --- awaitable <- retrieve fx --- value <- liftIO $ await awaitable --- retrieve $ fn value --- ---instance IsObservable r (BindObservable r) where --- observe (BindObservable fx fn) callback = do --- disposableVar <- liftIO $ newTMVarIO noDisposable --- keyVar <- liftIO $ newTMVarIO =<< newUnique --- --- observe fx (leftCallback disposableVar keyVar) --- where --- leftCallback disposableVar keyVar message = do --- key <- liftIO newUnique --- --- oldDisposable <- liftIO $ atomically do --- -- Blocks while `rightCallback` is running --- void $ swapTMVar keyVar key --- --- takeTMVar disposableVar --- --- disposeEventually_ oldDisposable --- --- disposable <- case message of --- (ObservableValue x) -> captureDisposable_ $ observe (fn x) (rightCallback keyVar key) --- ObservableLoading -> noDisposable <$ callback ObservableLoading --- (ObservableNotAvailable ex) -> noDisposable <$ callback (ObservableNotAvailable ex) --- --- liftIO $ atomically $ putTMVar disposableVar disposable --- --- rightCallback :: TMVar Unique -> Unique -> ObservableState r -> ResourceManagerIO () --- rightCallback keyVar key message = --- bracket --- -- Take key var to prevent parallel callbacks --- (liftIO $ atomically $ takeTMVar keyVar) --- -- Put key back --- (liftIO . atomically . putTMVar keyVar) --- -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed --- (\currentKey -> when (key == currentKey) $ callback message) --- --- --data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r) -- --instance IsRetrievable r (CatchObservable e r) where