Skip to content
Snippets Groups Projects
Commit 8e9478b4 authored by Jens Nolte's avatar Jens Nolte
Browse files

Implement Monad instance for Observable

parent 8a9c87ad
No related branches found
No related tags found
No related merge requests found
......@@ -125,9 +125,9 @@ instance Applicative Observable where
pure = toObservable . ConstObservable
liftA2 fn x y = toObservable $ LiftA2Observable fn x y
--instance Monad Observable where
-- x >>= y = toObservable $ BindObservable x y
--
instance Monad Observable where
x >>= f = toObservable $ BindObservable x f
--instance MonadThrow Observable where
-- throwM :: forall e v. Exception e => e -> Observable v
-- throwM = toObservable . FailedObservable @v . toException
......@@ -226,7 +226,7 @@ instance IsRetrievable r (MappedObservable r) where
instance IsObservable r (MappedObservable r) where
observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn)
pingObservable (MappedObservable _ observable) = pingObservable observable
mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream
mapObservable f1 (MappedObservable f2 upstream) = toObservable $ MappedObservable (f1 . f2) upstream
-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
......@@ -258,52 +258,47 @@ instance IsObservable r (LiftA2Observable r) where
pingObservable fx
await future
mapObservable f1 (LiftA2Observable f2 fx fy) = Observable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy
mapObservable f1 (LiftA2Observable f2 fx fy) = toObservable $ LiftA2Observable (\x y -> f1 (f2 x y)) fx fy
data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
instance IsRetrievable r (BindObservable r) where
retrieve (BindObservable fx fn) = do
x <- retrieve fx
retrieve $ fn x
instance IsObservable r (BindObservable r) where
observe (BindObservable fx fn) callback = ensureQuasarSTM do
callback ObservableLoading
keyVar <- newTVar =<< newUniqueSTM
disposableVar <- liftSTM $ newTVar trivialDisposer
observe fx (leftCallback keyVar disposableVar)
where
leftCallback keyVar disposableVar lmsg = do
disposeEventually_ =<< readTVar disposableVar
key <- newUniqueSTM
-- Dispose is not instant, so a key is used to disarm the callback derived from the last (now outdated) value
writeTVar keyVar key
disposer <- captureResources_
case lmsg of
ObservableValue x -> observe (fn x) (rightCallback key)
ObservableLoading -> callback ObservableLoading
ObservableNotAvailable ex -> callback (ObservableNotAvailable ex)
writeTVar disposableVar disposer
where
rightCallback :: Unique -> ObservableCallback r
rightCallback callbackKey rmsg = do
activeKey <- readTVar keyVar
when (callbackKey == activeKey) (callback rmsg)
pingObservable (BindObservable fx fn) = do
x <- retrieve fx
pingObservable (fn x)
mapObservable f (BindObservable fx fn) = toObservable $ BindObservable fx (f <<$>> fn)
--data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
--
--instance IsRetrievable r (BindObservable r) where
-- retrieve (BindObservable fx fn) = do
-- awaitable <- retrieve fx
-- value <- liftIO $ await awaitable
-- retrieve $ fn value
--
--instance IsObservable r (BindObservable r) where
-- observe (BindObservable fx fn) callback = do
-- disposableVar <- liftIO $ newTMVarIO noDisposable
-- keyVar <- liftIO $ newTMVarIO =<< newUnique
--
-- observe fx (leftCallback disposableVar keyVar)
-- where
-- leftCallback disposableVar keyVar message = do
-- key <- liftIO newUnique
--
-- oldDisposable <- liftIO $ atomically do
-- -- Blocks while `rightCallback` is running
-- void $ swapTMVar keyVar key
--
-- takeTMVar disposableVar
--
-- disposeEventually_ oldDisposable
--
-- disposable <- case message of
-- (ObservableValue x) -> captureDisposable_ $ observe (fn x) (rightCallback keyVar key)
-- ObservableLoading -> noDisposable <$ callback ObservableLoading
-- (ObservableNotAvailable ex) -> noDisposable <$ callback (ObservableNotAvailable ex)
--
-- liftIO $ atomically $ putTMVar disposableVar disposable
--
-- rightCallback :: TMVar Unique -> Unique -> ObservableState r -> ResourceManagerIO ()
-- rightCallback keyVar key message =
-- bracket
-- -- Take key var to prevent parallel callbacks
-- (liftIO $ atomically $ takeTMVar keyVar)
-- -- Put key back
-- (liftIO . atomically . putTMVar keyVar)
-- -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed
-- (\currentKey -> when (key == currentKey) $ callback message)
--
--
--data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r)
--
--instance IsRetrievable r (CatchObservable e r) where
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment