From 970b8adc6664351a6fca3b3395075479fc5ead38 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Wed, 11 Aug 2021 03:02:59 +0200 Subject: [PATCH] Implement bind and catch for observables and add more instances --- src/Quasar/Observable.hs | 258 ++++++++++++++++++++++++++++----------- 1 file changed, 188 insertions(+), 70 deletions(-) diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index a529e8a..f119cd6 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -1,4 +1,5 @@ {-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE ViewPatterns #-} module Quasar.Observable ( -- * Observable core types @@ -28,8 +29,10 @@ module Quasar.Observable ( ObservableCallback, ) where +import Control.Applicative import Control.Concurrent.MVar import Control.Concurrent.STM +import Control.Monad.Catch import Control.Monad.Except import Control.Monad.Reader import Control.Monad.Trans.Maybe @@ -45,14 +48,14 @@ import System.IO (fixIO) data ObservableMessage a = ObservableUpdate a - | ObservableConnecting + | ObservableLoading | ObservableReconnecting SomeException | ObservableNotAvailable SomeException deriving stock (Show, Generic) instance Functor ObservableMessage where fmap fn (ObservableUpdate x) = ObservableUpdate (fn x) - fmap _ ObservableConnecting = ObservableConnecting + fmap _ ObservableLoading = ObservableLoading fmap _ (ObservableReconnecting ex) = ObservableReconnecting ex fmap _ (ObservableNotAvailable ex) = ObservableNotAvailable ex @@ -62,8 +65,8 @@ instance Applicative ObservableMessage where liftA2 _ _ (ObservableNotAvailable ex) = ObservableNotAvailable ex liftA2 _ (ObservableReconnecting ex) _ = ObservableReconnecting ex liftA2 _ _ (ObservableReconnecting ex) = ObservableReconnecting ex - liftA2 _ ObservableConnecting _ = ObservableConnecting - liftA2 _ _ ObservableConnecting = ObservableConnecting + liftA2 _ ObservableLoading _ = ObservableLoading + liftA2 _ _ ObservableLoading = ObservableLoading liftA2 fn (ObservableUpdate x) (ObservableUpdate y) = ObservableUpdate (fn x y) @@ -113,11 +116,30 @@ instance IsObservable v (Observable v) where instance Functor Observable where fmap f = mapObservable f + instance Applicative Observable where - pure = constObservable - liftA2 = mergeObservable + pure = toObservable . ConstObservable + liftA2 fn x y = toObservable $ MergedObservable fn x y + instance Monad Observable where - (>>=) = bindObservable + x >>= y = toObservable $ BindObservable x y + +instance MonadThrow Observable where + throwM :: forall e v. Exception e => e -> Observable v + throwM = toObservable . FailedObservable @v . toException + +instance MonadCatch Observable where + catch action handler = toObservable $ CatchObservable action handler + +instance MonadFail Observable where + fail = throwM . userError + +instance Alternative Observable where + empty = fail "empty" + x <|> y = x `catchAll` const y + +instance MonadPlus Observable + data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o @@ -128,6 +150,146 @@ instance IsObservable v (MappedObservable v) where mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream + +data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r) + +instance IsRetrievable r (BindObservable r) where + retrieve (BindObservable fx fn) = async $ do + x <- awaitResult $ retrieve fx + awaitResult $ retrieve $ fn x + +instance IsObservable r (BindObservable r) where + observe :: forall r. (BindObservable r) -> (ObservableMessage r -> IO ()) -> IO Disposable + observe (BindObservable fx fn) callback = do + -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. + resourceManager <- newResourceManager unlimitedResourceManagerConfiguration + + isDisposingVar <- newTVarIO False + disposableVar <- newTMVarIO noDisposable + keyVar <- newTMVarIO Nothing + + leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + + attachDisposeAction_ resourceManager $ do + atomically $ writeTVar isDisposingVar True + d1 <- dispose leftDisposable + -- Block while the `outerCallback` is running + d2 <- dispose =<< atomically (takeTMVar disposableVar) + pure (d1 <> d2) + + pure $ toDisposable resourceManager + where + outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do + key <- newUnique + + join $ atomically $ do + readTVar isDisposingVar >>= \case + False -> do + -- Blocks while an inner callback is running + void $ swapTMVar keyVar (Just key) + + oldDisposable <- takeTMVar disposableVar + + -- IO action that will run after the STM transaction + pure $ do + disposeEventually resourceManager oldDisposable + + newDisposable <- + unmask (outerMessageHandler key observableMessage) + `onException` + atomically (putTMVar disposableVar noDisposable) + + atomically $ putTMVar disposableVar newDisposable + + -- When already disposing no new handlers should be registered + True -> pure $ pure () + + where + outerMessageHandler key (ObservableUpdate x) = observe (fn x) (innerCallback key) + outerMessageHandler key (ObservableLoading) = noDisposable <$ callback ObservableLoading + outerMessageHandler key (ObservableReconnecting ex) = noDisposable <$ callback (ObservableReconnecting ex) + outerMessageHandler key (ObservableNotAvailable ex) = noDisposable <$ callback (ObservableNotAvailable ex) + + innerCallback :: Unique -> ObservableMessage r -> IO () + innerCallback key x = do + bracket + -- Take key var to prevent parallel callbacks + (atomically $ takeTMVar keyVar) + -- Put key back + (atomically . putTMVar keyVar) + -- Call callback when key is still valid + (\currentKey -> when (Just key == currentKey) $ callback x) + + + +data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r) + +instance IsRetrievable r (CatchObservable e r) where + retrieve (CatchObservable fx fn) = async $ + awaitResult (retrieve fx) `catch` \ex -> awaitResult (retrieve (fn ex)) + +instance IsObservable r (CatchObservable e r) where + observe :: forall e r. (CatchObservable e r) -> (ObservableMessage r -> IO ()) -> IO Disposable + observe (CatchObservable fx fn) callback = do + -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. + resourceManager <- newResourceManager unlimitedResourceManagerConfiguration + + isDisposingVar <- newTVarIO False + disposableVar <- newTMVarIO noDisposable + keyVar <- newTMVarIO Nothing + + leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + + attachDisposeAction_ resourceManager $ do + atomically $ writeTVar isDisposingVar True + d1 <- dispose leftDisposable + -- Block while the `outerCallback` is running + d2 <- dispose =<< atomically (takeTMVar disposableVar) + pure (d1 <> d2) + + pure $ toDisposable resourceManager + where + outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do + key <- newUnique + + join $ atomically $ do + readTVar isDisposingVar >>= \case + False -> do + -- Blocks while an inner callback is running + void $ swapTMVar keyVar (Just key) + + oldDisposable <- takeTMVar disposableVar + + -- IO action that will run after the STM transaction + pure $ do + disposeEventually resourceManager oldDisposable + + newDisposable <- + unmask (outerMessageHandler key observableMessage) + `onException` + atomically (putTMVar disposableVar noDisposable) + + atomically $ putTMVar disposableVar newDisposable + + -- When already disposing no new handlers should be registered + True -> pure $ pure () + + where + outerMessageHandler key msg@(ObservableNotAvailable (fromException -> Just ex)) = observe (fn ex) (innerCallback key) + outerMessageHandler key msg = noDisposable <$ callback msg + + innerCallback :: Unique -> ObservableMessage r -> IO () + innerCallback key x = do + bracket + -- Take key var to prevent parallel callbacks + (atomically $ takeTMVar keyVar) + -- Put key back + (atomically . putTMVar keyVar) + -- Call callback when key is still valid + (\currentKey -> when (Just key == currentKey) $ callback x) + + + newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v))) instance IsRetrievable v (ObservableVar v) where retrieve (ObservableVar mvar) = liftIO $ successfulTask . fst <$> readMVar mvar @@ -173,68 +335,16 @@ withObservableVar (ObservableVar mvar) f = withMVar mvar (f . fst) bindObservable :: (IsObservable a ma, IsObservable b mb) => ma -> (a -> mb) -> Observable b -bindObservable x fy = joinObservable $ mapObservable fy x - - --- | Internal state of `JoinedObservable` -data JoinedObservableState - = JoinedObservableInactive - | JoinedObservableActive Unique Disposable - | JoinedObservableDisposed - -instance IsDisposable JoinedObservableState where - dispose JoinedObservableInactive = pure $ successfulAwaitable () - dispose (JoinedObservableActive _ disposable) = dispose disposable - dispose JoinedObservableDisposed = pure $ successfulAwaitable () - -newtype JoinedObservable o = JoinedObservable o -instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v (JoinedObservable o) where - retrieve :: HasResourceManager m => JoinedObservable o -> m (Task v) - retrieve (JoinedObservable outer) = async $ await =<< retrieve =<< await =<< retrieve outer -instance forall v o i. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where - observe :: JoinedObservable o -> (ObservableMessage v -> IO ()) -> IO Disposable - observe (JoinedObservable outer) callback = do - -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. - resourceManager <- newResourceManager unlimitedResourceManagerConfiguration - - stateMVar <- newMVar JoinedObservableInactive - outerDisposable <- observe outer (outerCallback resourceManager stateMVar) - - attachDisposeAction_ resourceManager $ do - d1 <- dispose outerDisposable - d2 <- modifyMVar stateMVar $ \state -> do - d2temp <- dispose state - pure (JoinedObservableDisposed, d2temp) - pure $ d1 <> d2 - - pure $ toDisposable resourceManager - where - outerCallback :: ResourceManager -> MVar JoinedObservableState -> ObservableMessage i -> IO () - outerCallback resourceManager stateMVar message = do - oldState <- takeMVar stateMVar - disposeEventually resourceManager oldState - key <- newUnique - newState <- outerCallbackObserve key message - putMVar stateMVar newState - where - outerCallbackObserve :: Unique -> ObservableMessage i -> IO JoinedObservableState - outerCallbackObserve key (ObservableUpdate innerObservable) = JoinedObservableActive key <$> observe innerObservable (filteredCallback key) - outerCallbackObserve _ ObservableConnecting = JoinedObservableInactive <$ callback ObservableConnecting - outerCallbackObserve _ (ObservableReconnecting ex) = JoinedObservableInactive <$ callback (ObservableReconnecting ex) - outerCallbackObserve _ (ObservableNotAvailable ex) = JoinedObservableInactive <$ callback (ObservableNotAvailable ex) - filteredCallback :: Unique -> ObservableMessage v -> IO () - filteredCallback key msg = - -- TODO write a version that does not deadlock when `observe` calls the callback directly - undefined - withMVar stateMVar $ \case - JoinedObservableActive activeKey _ -> callback msg - _ -> pure () +bindObservable fx fn = (toObservable fx) >>= \x -> toObservable (fn x) joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v -joinObservable = Observable . JoinedObservable - +joinObservable = join . fmap toObservable . toObservable +-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting +-- observable updates according to the merge function. +-- +-- There is no caching involed, every subscriber effectively subscribes to both input observables. data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1 instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1) @@ -256,7 +366,10 @@ instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObs mapM_ callback mMerged --- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting observable updates according to the merge function. +-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting +-- observable updates according to the merge function. +-- +-- Behaves like `liftA2` on `Observable` but accepts anything that implements `IsObservable`.. -- -- There is no caching involed, every subscriber effectively subscribes to both input observables. mergeObservable :: (IsObservable v0 o0, IsObservable v1 o1) => (v0 -> v1 -> r) -> o0 -> o1 -> Observable r @@ -296,14 +409,19 @@ synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn newtype ConstObservable v = ConstObservable v instance IsRetrievable v (ConstObservable v) where retrieve (ConstObservable x) = pure $ pure x -instance IsObservable a (ConstObservable a) where +instance IsObservable v (ConstObservable v) where observe (ConstObservable x) callback = do callback $ ObservableUpdate x pure noDisposable --- | Create an observable that contains a constant value. -constObservable :: v -> Observable v -constObservable = Observable . ConstObservable + +newtype FailedObservable v = FailedObservable SomeException +instance IsRetrievable v (FailedObservable v) where + retrieve (FailedObservable ex) = liftIO $ throwIO ex +instance IsObservable v (FailedObservable v) where + observe (FailedObservable ex) callback = do + callback $ ObservableNotAvailable ex + pure noDisposable -- | Create an observable by simply running an IO action whenever a value is requested or a callback is registered. -- GitLab