diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index dbf3902f730a8b5991f2b455b86f633f8dd6eecb..63cf838ca53b2af379f3282ef6bdb92662fd4b04 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -17,6 +17,8 @@ module Quasar.Observable ( modifyObservableVar_, -- * Helper functions + fnObservable, + synchronousFnObservable, mergeObservable, mergeObservableMaybe, joinObservable, @@ -27,13 +29,13 @@ module Quasar.Observable ( bindObservable, -- * Helper types - FnObservable(..), ObservableCallback, ) where import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Monad.Except +import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.HashMap.Strict qualified as HM import Data.IORef @@ -176,6 +178,17 @@ bindObservable :: (IsObservable a ma, IsObservable b mb) => ma -> (a -> mb) -> O bindObservable x fy = joinObservable $ mapObservable fy x +-- | Internal state of `JoinedObservable` +data JoinedObservableState + = JoinedObservableInactive + | JoinedObservableActive Unique Disposable + | JoinedObservableDisposed + +instance IsDisposable JoinedObservableState where + dispose JoinedObservableInactive = pure $ successfulAwaitable () + dispose (JoinedObservableActive _ disposable) = dispose disposable + dispose JoinedObservableDisposed = pure $ successfulAwaitable () + newtype JoinedObservable o = JoinedObservable o instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v (JoinedObservable o) where retrieve :: HasResourceManager m => JoinedObservable o -> m (AsyncTask v) @@ -183,27 +196,41 @@ instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v instance forall v o i. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where observe :: JoinedObservable o -> (ObservableMessage v -> IO ()) -> IO Disposable observe (JoinedObservable outer) callback = do - -- TODO: rewrite with latest semantics - -- the current implementation blocks the callback while `dispose` is running - innerDisposableMVar <- newMVar Nothing - outerDisposable <- observe outer (outerCallback innerDisposableMVar) - pure $ mkDisposable $ do - -- TODO use `disposeEventually` to immediately deregister handler (ignoring messages from the old callback after that) - undefined - --dispose outerDisposable - --mapM_ dispose =<< liftIO (readMVar innerDisposableMVar) + -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. + resourceManager <- newResourceManager unlimitedResourceManagerConfiguration + + stateMVar <- newMVar JoinedObservableInactive + outerDisposable <- observe outer (outerCallback resourceManager stateMVar) + + attachDisposeAction_ resourceManager $ do + d1 <- dispose outerDisposable + d2 <- modifyMVar stateMVar $ \state -> do + d2temp <- dispose state + pure (JoinedObservableDisposed, d2temp) + pure $ d1 <> d2 + + pure $ toDisposable resourceManager where - outerCallback :: MVar (Maybe Disposable) -> ObservableMessage i -> IO () - outerCallback innerDisposableMVar message = do - oldInnerDisposable <- takeMVar innerDisposableMVar - mapM_ disposeIO oldInnerDisposable - newInnerDisposable <- outerCallbackObserve message - putMVar innerDisposableMVar newInnerDisposable - outerCallbackObserve :: ObservableMessage i -> IO (Maybe Disposable) - outerCallbackObserve (ObservableUpdate innerObservable) = Just <$> observe innerObservable callback - outerCallbackObserve ObservableConnecting = Nothing <$ callback ObservableConnecting - outerCallbackObserve (ObservableReconnecting ex) = Nothing <$ callback (ObservableReconnecting ex) - outerCallbackObserve (ObservableNotAvailable ex) = Nothing <$ callback (ObservableNotAvailable ex) + outerCallback :: ResourceManager -> MVar JoinedObservableState -> ObservableMessage i -> IO () + outerCallback resourceManager stateMVar message = do + oldState <- takeMVar stateMVar + disposeEventually resourceManager oldState + key <- newUnique + newState <- outerCallbackObserve key message + putMVar stateMVar newState + where + outerCallbackObserve :: Unique -> ObservableMessage i -> IO JoinedObservableState + outerCallbackObserve key (ObservableUpdate innerObservable) = JoinedObservableActive key <$> observe innerObservable (filteredCallback key) + outerCallbackObserve _ ObservableConnecting = JoinedObservableInactive <$ callback ObservableConnecting + outerCallbackObserve _ (ObservableReconnecting ex) = JoinedObservableInactive <$ callback (ObservableReconnecting ex) + outerCallbackObserve _ (ObservableNotAvailable ex) = JoinedObservableInactive <$ callback (ObservableNotAvailable ex) + filteredCallback :: Unique -> ObservableMessage v -> IO () + filteredCallback key msg = + -- TODO write a version that does not deadlock when `observe` calls the callback directly + undefined + withMVar stateMVar $ \case + JoinedObservableActive activeKey _ -> callback msg + _ -> pure () joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v joinObservable = Observable . JoinedObservable @@ -255,20 +282,36 @@ mergeObservableMaybe :: (IsObservable (Maybe v0) o0, IsObservable (Maybe v1) o1) mergeObservableMaybe merge x y = Observable $ MergedObservable (liftA2 merge) x y --- | Data type that can be used as an implementation for the `IsObservable` interface that works by directly providing functions for `retrieve` and `subscribe`. data FnObservable v = FnObservable { - retrieveFn :: IO v, + retrieveFn :: forall m. HasResourceManager m => m (AsyncTask v), observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable } instance IsRetrievable v (FnObservable v) where - retrieve o = liftIO $ successfulTask <$> retrieveFn o + retrieve o = retrieveFn o instance IsObservable v (FnObservable v) where observe o = observeFn o mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable { - retrieveFn = f <$> retrieveFn, + retrieveFn = f <<$>> retrieveFn, observeFn = \listener -> observeFn (listener . fmap f) } +-- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`. +fnObservable + :: ((ObservableMessage v -> IO ()) -> IO Disposable) + -> (forall m. HasResourceManager m => m (AsyncTask v)) + -> Observable v +fnObservable observeFn retrieveFn = toObservable FnObservable{observeFn, retrieveFn} + +-- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`. +synchronousFnObservable + :: forall v. ((ObservableMessage v -> IO ()) -> IO Disposable) + -> IO v + -> Observable v +synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn retrieveFn + where + retrieveFn :: (forall m. HasResourceManager m => m (AsyncTask v)) + retrieveFn = liftIO $ successfulTask <$> synchronousRetrieveFn + newtype ConstObservable v = ConstObservable v instance IsRetrievable v (ConstObservable v) where diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index 3b12db267c7f61a2cf7481ca727f90e299e4c3d1..c183ffc158ab8a53b0d8220ed87c282989fb7d2c 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -115,7 +115,7 @@ new :: IO (ObservableHashMap k v) new = ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty} observeKey :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> Observable (Maybe v) -observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{retrieveFn, observeFn} +observeKey key ohm@(ObservableHashMap mvar) = synchronousFnObservable observeFn retrieveFn where retrieveFn :: IO (Maybe v) retrieveFn = liftIO $ join . preview (_keyHandles . at key . _Just . _value) <$> readMVar mvar