Skip to content
Snippets Groups Projects
Commit 970b8adc authored by Jens Nolte's avatar Jens Nolte
Browse files

Implement bind and catch for observables and add more instances

parent 4dfb8562
No related branches found
No related tags found
No related merge requests found
Pipeline #2356 passed
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE ViewPatterns #-}
module Quasar.Observable (
-- * Observable core types
......@@ -28,8 +29,10 @@ module Quasar.Observable (
ObservableCallback,
) where
import Control.Applicative
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Monad.Catch
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
......@@ -45,14 +48,14 @@ import System.IO (fixIO)
data ObservableMessage a
= ObservableUpdate a
| ObservableConnecting
| ObservableLoading
| ObservableReconnecting SomeException
| ObservableNotAvailable SomeException
deriving stock (Show, Generic)
instance Functor ObservableMessage where
fmap fn (ObservableUpdate x) = ObservableUpdate (fn x)
fmap _ ObservableConnecting = ObservableConnecting
fmap _ ObservableLoading = ObservableLoading
fmap _ (ObservableReconnecting ex) = ObservableReconnecting ex
fmap _ (ObservableNotAvailable ex) = ObservableNotAvailable ex
......@@ -62,8 +65,8 @@ instance Applicative ObservableMessage where
liftA2 _ _ (ObservableNotAvailable ex) = ObservableNotAvailable ex
liftA2 _ (ObservableReconnecting ex) _ = ObservableReconnecting ex
liftA2 _ _ (ObservableReconnecting ex) = ObservableReconnecting ex
liftA2 _ ObservableConnecting _ = ObservableConnecting
liftA2 _ _ ObservableConnecting = ObservableConnecting
liftA2 _ ObservableLoading _ = ObservableLoading
liftA2 _ _ ObservableLoading = ObservableLoading
liftA2 fn (ObservableUpdate x) (ObservableUpdate y) = ObservableUpdate (fn x y)
......@@ -113,11 +116,30 @@ instance IsObservable v (Observable v) where
instance Functor Observable where
fmap f = mapObservable f
instance Applicative Observable where
pure = constObservable
liftA2 = mergeObservable
pure = toObservable . ConstObservable
liftA2 fn x y = toObservable $ MergedObservable fn x y
instance Monad Observable where
(>>=) = bindObservable
x >>= y = toObservable $ BindObservable x y
instance MonadThrow Observable where
throwM :: forall e v. Exception e => e -> Observable v
throwM = toObservable . FailedObservable @v . toException
instance MonadCatch Observable where
catch action handler = toObservable $ CatchObservable action handler
instance MonadFail Observable where
fail = throwM . userError
instance Alternative Observable where
empty = fail "empty"
x <|> y = x `catchAll` const y
instance MonadPlus Observable
data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o
......@@ -128,6 +150,146 @@ instance IsObservable v (MappedObservable v) where
mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream
data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
instance IsRetrievable r (BindObservable r) where
retrieve (BindObservable fx fn) = async $ do
x <- awaitResult $ retrieve fx
awaitResult $ retrieve $ fn x
instance IsObservable r (BindObservable r) where
observe :: forall r. (BindObservable r) -> (ObservableMessage r -> IO ()) -> IO Disposable
observe (BindObservable fx fn) callback = do
-- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
resourceManager <- newResourceManager unlimitedResourceManagerConfiguration
isDisposingVar <- newTVarIO False
disposableVar <- newTMVarIO noDisposable
keyVar <- newTMVarIO Nothing
leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar)
attachDisposeAction_ resourceManager $ do
atomically $ writeTVar isDisposingVar True
d1 <- dispose leftDisposable
-- Block while the `outerCallback` is running
d2 <- dispose =<< atomically (takeTMVar disposableVar)
pure (d1 <> d2)
pure $ toDisposable resourceManager
where
outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do
key <- newUnique
join $ atomically $ do
readTVar isDisposingVar >>= \case
False -> do
-- Blocks while an inner callback is running
void $ swapTMVar keyVar (Just key)
oldDisposable <- takeTMVar disposableVar
-- IO action that will run after the STM transaction
pure $ do
disposeEventually resourceManager oldDisposable
newDisposable <-
unmask (outerMessageHandler key observableMessage)
`onException`
atomically (putTMVar disposableVar noDisposable)
atomically $ putTMVar disposableVar newDisposable
-- When already disposing no new handlers should be registered
True -> pure $ pure ()
where
outerMessageHandler key (ObservableUpdate x) = observe (fn x) (innerCallback key)
outerMessageHandler key (ObservableLoading) = noDisposable <$ callback ObservableLoading
outerMessageHandler key (ObservableReconnecting ex) = noDisposable <$ callback (ObservableReconnecting ex)
outerMessageHandler key (ObservableNotAvailable ex) = noDisposable <$ callback (ObservableNotAvailable ex)
innerCallback :: Unique -> ObservableMessage r -> IO ()
innerCallback key x = do
bracket
-- Take key var to prevent parallel callbacks
(atomically $ takeTMVar keyVar)
-- Put key back
(atomically . putTMVar keyVar)
-- Call callback when key is still valid
(\currentKey -> when (Just key == currentKey) $ callback x)
data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r)
instance IsRetrievable r (CatchObservable e r) where
retrieve (CatchObservable fx fn) = async $
awaitResult (retrieve fx) `catch` \ex -> awaitResult (retrieve (fn ex))
instance IsObservable r (CatchObservable e r) where
observe :: forall e r. (CatchObservable e r) -> (ObservableMessage r -> IO ()) -> IO Disposable
observe (CatchObservable fx fn) callback = do
-- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
resourceManager <- newResourceManager unlimitedResourceManagerConfiguration
isDisposingVar <- newTVarIO False
disposableVar <- newTMVarIO noDisposable
keyVar <- newTMVarIO Nothing
leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar)
attachDisposeAction_ resourceManager $ do
atomically $ writeTVar isDisposingVar True
d1 <- dispose leftDisposable
-- Block while the `outerCallback` is running
d2 <- dispose =<< atomically (takeTMVar disposableVar)
pure (d1 <> d2)
pure $ toDisposable resourceManager
where
outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do
key <- newUnique
join $ atomically $ do
readTVar isDisposingVar >>= \case
False -> do
-- Blocks while an inner callback is running
void $ swapTMVar keyVar (Just key)
oldDisposable <- takeTMVar disposableVar
-- IO action that will run after the STM transaction
pure $ do
disposeEventually resourceManager oldDisposable
newDisposable <-
unmask (outerMessageHandler key observableMessage)
`onException`
atomically (putTMVar disposableVar noDisposable)
atomically $ putTMVar disposableVar newDisposable
-- When already disposing no new handlers should be registered
True -> pure $ pure ()
where
outerMessageHandler key msg@(ObservableNotAvailable (fromException -> Just ex)) = observe (fn ex) (innerCallback key)
outerMessageHandler key msg = noDisposable <$ callback msg
innerCallback :: Unique -> ObservableMessage r -> IO ()
innerCallback key x = do
bracket
-- Take key var to prevent parallel callbacks
(atomically $ takeTMVar keyVar)
-- Put key back
(atomically . putTMVar keyVar)
-- Call callback when key is still valid
(\currentKey -> when (Just key == currentKey) $ callback x)
newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v)))
instance IsRetrievable v (ObservableVar v) where
retrieve (ObservableVar mvar) = liftIO $ successfulTask . fst <$> readMVar mvar
......@@ -173,68 +335,16 @@ withObservableVar (ObservableVar mvar) f = withMVar mvar (f . fst)
bindObservable :: (IsObservable a ma, IsObservable b mb) => ma -> (a -> mb) -> Observable b
bindObservable x fy = joinObservable $ mapObservable fy x
-- | Internal state of `JoinedObservable`
data JoinedObservableState
= JoinedObservableInactive
| JoinedObservableActive Unique Disposable
| JoinedObservableDisposed
instance IsDisposable JoinedObservableState where
dispose JoinedObservableInactive = pure $ successfulAwaitable ()
dispose (JoinedObservableActive _ disposable) = dispose disposable
dispose JoinedObservableDisposed = pure $ successfulAwaitable ()
newtype JoinedObservable o = JoinedObservable o
instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v (JoinedObservable o) where
retrieve :: HasResourceManager m => JoinedObservable o -> m (Task v)
retrieve (JoinedObservable outer) = async $ await =<< retrieve =<< await =<< retrieve outer
instance forall v o i. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where
observe :: JoinedObservable o -> (ObservableMessage v -> IO ()) -> IO Disposable
observe (JoinedObservable outer) callback = do
-- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
resourceManager <- newResourceManager unlimitedResourceManagerConfiguration
stateMVar <- newMVar JoinedObservableInactive
outerDisposable <- observe outer (outerCallback resourceManager stateMVar)
attachDisposeAction_ resourceManager $ do
d1 <- dispose outerDisposable
d2 <- modifyMVar stateMVar $ \state -> do
d2temp <- dispose state
pure (JoinedObservableDisposed, d2temp)
pure $ d1 <> d2
pure $ toDisposable resourceManager
where
outerCallback :: ResourceManager -> MVar JoinedObservableState -> ObservableMessage i -> IO ()
outerCallback resourceManager stateMVar message = do
oldState <- takeMVar stateMVar
disposeEventually resourceManager oldState
key <- newUnique
newState <- outerCallbackObserve key message
putMVar stateMVar newState
where
outerCallbackObserve :: Unique -> ObservableMessage i -> IO JoinedObservableState
outerCallbackObserve key (ObservableUpdate innerObservable) = JoinedObservableActive key <$> observe innerObservable (filteredCallback key)
outerCallbackObserve _ ObservableConnecting = JoinedObservableInactive <$ callback ObservableConnecting
outerCallbackObserve _ (ObservableReconnecting ex) = JoinedObservableInactive <$ callback (ObservableReconnecting ex)
outerCallbackObserve _ (ObservableNotAvailable ex) = JoinedObservableInactive <$ callback (ObservableNotAvailable ex)
filteredCallback :: Unique -> ObservableMessage v -> IO ()
filteredCallback key msg =
-- TODO write a version that does not deadlock when `observe` calls the callback directly
undefined
withMVar stateMVar $ \case
JoinedObservableActive activeKey _ -> callback msg
_ -> pure ()
bindObservable fx fn = (toObservable fx) >>= \x -> toObservable (fn x)
joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v
joinObservable = Observable . JoinedObservable
joinObservable = join . fmap toObservable . toObservable
-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
-- observable updates according to the merge function.
--
-- There is no caching involed, every subscriber effectively subscribes to both input observables.
data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1
instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where
retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1)
......@@ -256,7 +366,10 @@ instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObs
mapM_ callback mMerged
-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting observable updates according to the merge function.
-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
-- observable updates according to the merge function.
--
-- Behaves like `liftA2` on `Observable` but accepts anything that implements `IsObservable`..
--
-- There is no caching involed, every subscriber effectively subscribes to both input observables.
mergeObservable :: (IsObservable v0 o0, IsObservable v1 o1) => (v0 -> v1 -> r) -> o0 -> o1 -> Observable r
......@@ -296,14 +409,19 @@ synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn
newtype ConstObservable v = ConstObservable v
instance IsRetrievable v (ConstObservable v) where
retrieve (ConstObservable x) = pure $ pure x
instance IsObservable a (ConstObservable a) where
instance IsObservable v (ConstObservable v) where
observe (ConstObservable x) callback = do
callback $ ObservableUpdate x
pure noDisposable
-- | Create an observable that contains a constant value.
constObservable :: v -> Observable v
constObservable = Observable . ConstObservable
newtype FailedObservable v = FailedObservable SomeException
instance IsRetrievable v (FailedObservable v) where
retrieve (FailedObservable ex) = liftIO $ throwIO ex
instance IsObservable v (FailedObservable v) where
observe (FailedObservable ex) callback = do
callback $ ObservableNotAvailable ex
pure noDisposable
-- | Create an observable by simply running an IO action whenever a value is requested or a callback is registered.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment