From 41cc75b25fd5a4accda4c215a900cedcb202517d Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 22 Nov 2020 04:28:51 +0100 Subject: [PATCH] Use Disposable in more places This is a requirement for multi-hop network connections. --- src/lib/Qd/Observable.hs | 101 ++++++++++++++----- src/lib/Qd/Observable/ObservableHashMap.hs | 28 ++--- src/lib/Qd/Observable/ObservablePriority.hs | 10 +- test/Qd/Observable/ObservablePrioritySpec.hs | 8 +- 4 files changed, 100 insertions(+), 47 deletions(-) diff --git a/src/lib/Qd/Observable.hs b/src/lib/Qd/Observable.hs index 9b9f750..09d5e68 100644 --- a/src/lib/Qd/Observable.hs +++ b/src/lib/Qd/Observable.hs @@ -6,9 +6,10 @@ module Qd.Observable ( IsObservable(..), unsafeGetValue, subscribe', - SubscriptionHandle(..), - RegistrationHandle(..), + SubscriptionHandle, + RegistrationHandle, IsSettable(..), + Disposable(..), IsDisposable(..), ObservableCallback, ObservableMessage, @@ -29,6 +30,7 @@ module Qd.Observable ( constObservable, FnObservable(..), waitFor, + waitFor', ) where import Qd.Prelude @@ -42,11 +44,19 @@ import qualified Data.HashMap.Strict as HM import Data.IORef import Data.Unique -waitFor :: ((a -> IO ()) -> IO ()) -> IO a +waitFor :: forall a. ((a -> IO ()) -> IO ()) -> IO a waitFor action = do - result <- newEmptyMVar - action (putMVar result) - takeMVar result + mvar <- newEmptyMVar + action (callback mvar) + readMVar mvar + where + callback :: MVar a -> a -> IO () + callback mvar result = do + success <- tryPutMVar mvar result + unless success $ fail "Callback was called multiple times" + +waitFor' :: (IO () -> IO ()) -> IO () +waitFor' action = waitFor $ \callback -> action (callback ()) data MessageReason = Current | Update @@ -58,17 +68,54 @@ type ObservableMessage v = (MessageReason, v) mapObservableMessage :: Monad m => (a -> m b) -> ObservableMessage a -> m (ObservableMessage b) mapObservableMessage f (r, s) = (r, ) <$> f s -newtype SubscriptionHandle = SubscriptionHandle { unsubscribe :: IO () } -newtype RegistrationHandle = RegistrationHandle { deregister :: IO () } +type SubscriptionHandle = Disposable +type RegistrationHandle = Disposable + +data Disposable + = forall a. IsDisposable a => SomeDisposable a + | FunctionDisposable (IO () -> IO ()) + | MultiDisposable [Disposable] + | DummyDisposable + +instance IsDisposable Disposable where + dispose (SomeDisposable x) = dispose x + dispose (FunctionDisposable fn) = fn (return ()) + dispose d@(MultiDisposable _) = waitFor' $ dispose' d + dispose DummyDisposable = return () + dispose_ (SomeDisposable x) = dispose_ x + dispose_ (FunctionDisposable fn) = fn (return ()) + dispose_ d@(MultiDisposable _) = waitFor' $ dispose' d + dispose_ DummyDisposable = return () + dispose' (SomeDisposable x) = dispose' x + dispose' (FunctionDisposable fn) = fn + dispose' (MultiDisposable xs) = \disposeCallback -> do + mvars <- mapM startDispose xs + forM_ mvars $ readMVar + disposeCallback + where + startDispose :: Disposable -> IO (MVar ()) + startDispose disposable = do + mvar <- newEmptyMVar + dispose' disposable (callback mvar) + return (mvar :: MVar ()) + callback :: MVar () -> IO () + callback mvar = do + success <- tryPutMVar mvar () + unless success $ fail "Callback was called multiple times" + dispose' DummyDisposable = id class IsDisposable a where + -- | Dispose a resource. Blocks until the resource is released. dispose :: a -> IO () -instance IsDisposable SubscriptionHandle where - dispose = unsubscribe -instance IsDisposable RegistrationHandle where - dispose = deregister + dispose = waitFor' . dispose' + -- | Dispose a resource. Returns without waiting for the resource to be released. + dispose_ :: a -> IO () + dispose_ disposable = dispose' disposable (return ()) + -- | Dispose a resource. When the resource has been released the callback is invoked. + dispose' :: a -> IO () -> IO () instance IsDisposable a => IsDisposable (Maybe a) where - dispose = mapM_ dispose + dispose' (Just disposable) callback = dispose' disposable callback + dispose' Nothing callback = callback class IsGettable v a | a -> v where @@ -156,10 +203,12 @@ instance IsObservable v (ObservableVar v) where -- Call listener callback (Current, state) return (state, HM.insert key callback subscribers) - return $ SubscriptionHandle $ unsubscribe' key + return $ FunctionDisposable (disposeFn key) where - unsubscribe' :: Unique -> IO () - unsubscribe' key = modifyMVar_ mvar $ \(state, subscribers) -> return (state, HM.delete key subscribers) + disposeFn :: Unique -> IO () -> IO () + disposeFn key disposeCallback = do + modifyMVar_ mvar (\(state, subscribers) -> return (state, HM.delete key subscribers)) + disposeCallback instance IsSettable v (ObservableVar v) where setValue (ObservableVar mvar) value = modifyMVar_ mvar $ \(_, subscribers) -> do @@ -202,17 +251,17 @@ instance forall o i v. (IsGettable i o, IsGettable v i) => IsGettable v (JoinedO instance forall o i v. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where subscribe :: (JoinedObservable o) -> (ObservableMessage v -> IO ()) -> IO SubscriptionHandle subscribe (JoinedObservable outer) callback = do - innerSubscriptionMVar <- newMVar dummySubscription + innerSubscriptionMVar <- newMVar DummyDisposable outerSubscription <- subscribe outer (outerCallback innerSubscriptionMVar) - return $ SubscriptionHandle{unsubscribe = unsubscribe outerSubscription >> readMVar innerSubscriptionMVar >>= dispose} + return $ FunctionDisposable (\disposeCallback -> dispose' outerSubscription (readMVar innerSubscriptionMVar >>= \innerSubscription -> dispose' innerSubscription disposeCallback)) where - dummySubscription = SubscriptionHandle { unsubscribe = return () } - outerCallback innerSubscriptionMVar = outerSubscription' + outerCallback innerSubscriptionMVar = outerCallback' where - outerSubscription' (_, inner) = do - unsubscribe =<< takeMVar innerSubscriptionMVar - innerSubscription <- subscribe inner callback - putMVar innerSubscriptionMVar innerSubscription + outerCallback' (_reason, innerObservable) = do + oldInnerSubscription <- takeMVar innerSubscriptionMVar + dispose' oldInnerSubscription $ do + newInnerSubscription <- subscribe innerObservable callback + putMVar innerSubscriptionMVar newInnerSubscription joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v joinObservable = Observable . JoinedObservable @@ -243,7 +292,7 @@ instance forall o0 v0 o1 v1 r. (IsObservable v0 o0, IsObservable v1 o1) => IsObs currentValuesTupleRef <- newIORef (Nothing, Nothing) sub0 <- subscribe obs0 (mergeCallback currentValuesTupleRef . fmap Left) sub1 <- subscribe obs1 (mergeCallback currentValuesTupleRef . fmap Right) - return $ SubscriptionHandle{unsubscribe = unsubscribe sub0 >> unsubscribe sub1} + return $ MultiDisposable [sub0, sub1] where mergeCallback :: IORef (Maybe v0, Maybe v1) -> (MessageReason, Either v0 v1) -> IO () mergeCallback currentValuesTupleRef (reason, state) = do @@ -288,7 +337,7 @@ instance IsGettable a (ConstObservable a) where instance IsObservable a (ConstObservable a) where subscribe (ConstObservable x) callback = do callback (Current, x) - return $ SubscriptionHandle { unsubscribe = return () } + return DummyDisposable -- | Create an observable that contains a constant value. constObservable :: a -> Observable a constObservable = Observable . ConstObservable diff --git a/src/lib/Qd/Observable/ObservableHashMap.hs b/src/lib/Qd/Observable/ObservableHashMap.hs index 91c1588..14b7b97 100644 --- a/src/lib/Qd/Observable/ObservableHashMap.hs +++ b/src/lib/Qd/Observable/ObservableHashMap.hs @@ -48,9 +48,11 @@ instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where callback (Current, toHashMap handle) unique <- newUnique let handle' = handle & set (_subscribers . at unique) (Just callback) - return (handle', SubscriptionHandle $ unsubscribe unique) - unsubscribe :: Unique -> IO () - unsubscribe unique = modifyHandle_ (return . set (_subscribers . at unique) Nothing) ohm + return (handle', FunctionDisposable $ unsubscribe unique) + unsubscribe :: Unique -> IO () -> IO () + unsubscribe unique unsubscribedCallback = do + modifyHandle_ (return . set (_subscribers . at unique) Nothing) ohm + unsubscribedCallback instance IsDeltaObservable k v (ObservableHashMap k v) where subscribeDelta ohm callback = modifyHandle update ohm @@ -60,13 +62,11 @@ instance IsDeltaObservable k v (ObservableHashMap k v) where callback (Reset $ toHashMap handle) unique <- newUnique let handle' = handle & set (_deltaSubscribers . at unique) (Just callback) - return (handle', SubscriptionHandle $ unsubscribe unique) - unsubscribe :: Unique -> IO () - unsubscribe unique = modifyHandle_ (return . set (_deltaSubscribers . at unique) Nothing) ohm - --- TODO ---subscribeAbstraction :: SomeIndexedLens -> (a -> v) -> (IO (a, r) -> IO r) -> (v -> IO ()) -> IO r ---subscribeAbstraction setter getCurrent modifyMVar callback = modify $ do + return (handle', FunctionDisposable $ unsubscribe unique) + unsubscribe :: Unique -> IO () -> IO () + unsubscribe unique unsubscribedCallback = do + modifyHandle_ (return . set (_deltaSubscribers . at unique) Nothing) ohm + unsubscribedCallback toHashMap :: Handle k v -> HM.HashMap k v @@ -128,14 +128,16 @@ observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{getValueFn subscribeFn callback = do subscriptionKey <- newUnique modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm - return $ SubscriptionHandle $ unsubscribe subscriptionKey + return $ FunctionDisposable $ unsubscribe subscriptionKey where subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v) subscribeFn' subKey keyHandle@KeyHandle{value} = do callback (Current, value) return $ modifyKeySubscribers (HM.insert subKey callback) keyHandle - unsubscribe :: Unique -> IO () - unsubscribe subKey = modifyKeyHandle_ (return . modifyKeySubscribers (HM.delete subKey)) key ohm + unsubscribe :: Unique -> IO () -> IO () + unsubscribe subKey unsubscribedCallback = do + modifyKeyHandle_ (return . modifyKeySubscribers (HM.delete subKey)) key ohm + unsubscribedCallback insert :: forall k v. (Eq k, Hashable k) => k -> v -> ObservableHashMap k v -> IO () insert key value = modifyKeyHandleNotifying_ fn key diff --git a/src/lib/Qd/Observable/ObservablePriority.hs b/src/lib/Qd/Observable/ObservablePriority.hs index 30e0c82..ee5e74c 100644 --- a/src/lib/Qd/Observable/ObservablePriority.hs +++ b/src/lib/Qd/Observable/ObservablePriority.hs @@ -33,10 +33,12 @@ instance IsObservable (Maybe v) (ObservablePriority p v) where -- Call listener callback (Current, currentValue internals) return internals{subscribers = HM.insert key callback subscribers} - return $ SubscriptionHandle $ unsubscribe' key + return $ FunctionDisposable (unsubscribe key) where - unsubscribe' :: Unique -> IO () - unsubscribe' key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> return internals{subscribers=HM.delete key subscribers} + unsubscribe :: Unique -> IO () -> IO () + unsubscribe key disposeCallback = do + modifyMVar_ mvar $ \internals@Internals{subscribers} -> return internals{subscribers=HM.delete key subscribers} + disposeCallback type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v)) @@ -63,7 +65,7 @@ insertValue :: forall p v. (Ord p, Hashable p) => ObservablePriority p v -> p -> insertValue (ObservablePriority mvar) priority value = modifyMVar mvar $ \internals -> do key <- newUnique newInternals <- insertValue' key internals - return (newInternals, RegistrationHandle {deregister=removeValue key}) + return (newInternals, FunctionDisposable (\callback -> removeValue key >> callback)) where insertValue' :: Unique -> Internals p v -> IO (Internals p v) insertValue' key internals@Internals{priorityMap, current} diff --git a/test/Qd/Observable/ObservablePrioritySpec.hs b/test/Qd/Observable/ObservablePrioritySpec.hs index 25b7984..d33701d 100644 --- a/test/Qd/Observable/ObservablePrioritySpec.hs +++ b/test/Qd/Observable/ObservablePrioritySpec.hs @@ -21,9 +21,9 @@ spec = do getValue op `shouldReturn` (Just "p2") p1 <- OP.insertValue op 1 "p1" getValue op `shouldReturn` (Just "p2") - deregister p2 + dispose p2 getValue op `shouldReturn` (Just "p1") - deregister p1 + dispose p1 getValue op `shouldReturn` (Nothing) it "sends updates when its value changes" $ do result <- newIORef [] @@ -37,9 +37,9 @@ spec = do mostRecentShouldBe (Update, Just "p2") p1 <- OP.insertValue op 1 "p1" mostRecentShouldBe (Update, Just "p2") - deregister p2 + dispose p2 mostRecentShouldBe (Update, Just "p1") - deregister p1 + dispose p1 mostRecentShouldBe (Update, Nothing) length <$> readIORef result `shouldReturn` 4 -- GitLab