Skip to content
Snippets Groups Projects
Commit 41cc75b2 authored by Jens Nolte's avatar Jens Nolte
Browse files

Use Disposable in more places

This is a requirement for multi-hop network connections.
parent ed1b398f
No related branches found
No related tags found
No related merge requests found
...@@ -6,9 +6,10 @@ module Qd.Observable ( ...@@ -6,9 +6,10 @@ module Qd.Observable (
IsObservable(..), IsObservable(..),
unsafeGetValue, unsafeGetValue,
subscribe', subscribe',
SubscriptionHandle(..), SubscriptionHandle,
RegistrationHandle(..), RegistrationHandle,
IsSettable(..), IsSettable(..),
Disposable(..),
IsDisposable(..), IsDisposable(..),
ObservableCallback, ObservableCallback,
ObservableMessage, ObservableMessage,
...@@ -29,6 +30,7 @@ module Qd.Observable ( ...@@ -29,6 +30,7 @@ module Qd.Observable (
constObservable, constObservable,
FnObservable(..), FnObservable(..),
waitFor, waitFor,
waitFor',
) where ) where
import Qd.Prelude import Qd.Prelude
...@@ -42,11 +44,19 @@ import qualified Data.HashMap.Strict as HM ...@@ -42,11 +44,19 @@ import qualified Data.HashMap.Strict as HM
import Data.IORef import Data.IORef
import Data.Unique import Data.Unique
waitFor :: ((a -> IO ()) -> IO ()) -> IO a waitFor :: forall a. ((a -> IO ()) -> IO ()) -> IO a
waitFor action = do waitFor action = do
result <- newEmptyMVar mvar <- newEmptyMVar
action (putMVar result) action (callback mvar)
takeMVar result readMVar mvar
where
callback :: MVar a -> a -> IO ()
callback mvar result = do
success <- tryPutMVar mvar result
unless success $ fail "Callback was called multiple times"
waitFor' :: (IO () -> IO ()) -> IO ()
waitFor' action = waitFor $ \callback -> action (callback ())
data MessageReason = Current | Update data MessageReason = Current | Update
...@@ -58,17 +68,54 @@ type ObservableMessage v = (MessageReason, v) ...@@ -58,17 +68,54 @@ type ObservableMessage v = (MessageReason, v)
mapObservableMessage :: Monad m => (a -> m b) -> ObservableMessage a -> m (ObservableMessage b) mapObservableMessage :: Monad m => (a -> m b) -> ObservableMessage a -> m (ObservableMessage b)
mapObservableMessage f (r, s) = (r, ) <$> f s mapObservableMessage f (r, s) = (r, ) <$> f s
newtype SubscriptionHandle = SubscriptionHandle { unsubscribe :: IO () } type SubscriptionHandle = Disposable
newtype RegistrationHandle = RegistrationHandle { deregister :: IO () } type RegistrationHandle = Disposable
data Disposable
= forall a. IsDisposable a => SomeDisposable a
| FunctionDisposable (IO () -> IO ())
| MultiDisposable [Disposable]
| DummyDisposable
instance IsDisposable Disposable where
dispose (SomeDisposable x) = dispose x
dispose (FunctionDisposable fn) = fn (return ())
dispose d@(MultiDisposable _) = waitFor' $ dispose' d
dispose DummyDisposable = return ()
dispose_ (SomeDisposable x) = dispose_ x
dispose_ (FunctionDisposable fn) = fn (return ())
dispose_ d@(MultiDisposable _) = waitFor' $ dispose' d
dispose_ DummyDisposable = return ()
dispose' (SomeDisposable x) = dispose' x
dispose' (FunctionDisposable fn) = fn
dispose' (MultiDisposable xs) = \disposeCallback -> do
mvars <- mapM startDispose xs
forM_ mvars $ readMVar
disposeCallback
where
startDispose :: Disposable -> IO (MVar ())
startDispose disposable = do
mvar <- newEmptyMVar
dispose' disposable (callback mvar)
return (mvar :: MVar ())
callback :: MVar () -> IO ()
callback mvar = do
success <- tryPutMVar mvar ()
unless success $ fail "Callback was called multiple times"
dispose' DummyDisposable = id
class IsDisposable a where class IsDisposable a where
-- | Dispose a resource. Blocks until the resource is released.
dispose :: a -> IO () dispose :: a -> IO ()
instance IsDisposable SubscriptionHandle where dispose = waitFor' . dispose'
dispose = unsubscribe -- | Dispose a resource. Returns without waiting for the resource to be released.
instance IsDisposable RegistrationHandle where dispose_ :: a -> IO ()
dispose = deregister dispose_ disposable = dispose' disposable (return ())
-- | Dispose a resource. When the resource has been released the callback is invoked.
dispose' :: a -> IO () -> IO ()
instance IsDisposable a => IsDisposable (Maybe a) where instance IsDisposable a => IsDisposable (Maybe a) where
dispose = mapM_ dispose dispose' (Just disposable) callback = dispose' disposable callback
dispose' Nothing callback = callback
class IsGettable v a | a -> v where class IsGettable v a | a -> v where
...@@ -156,10 +203,12 @@ instance IsObservable v (ObservableVar v) where ...@@ -156,10 +203,12 @@ instance IsObservable v (ObservableVar v) where
-- Call listener -- Call listener
callback (Current, state) callback (Current, state)
return (state, HM.insert key callback subscribers) return (state, HM.insert key callback subscribers)
return $ SubscriptionHandle $ unsubscribe' key return $ FunctionDisposable (disposeFn key)
where where
unsubscribe' :: Unique -> IO () disposeFn :: Unique -> IO () -> IO ()
unsubscribe' key = modifyMVar_ mvar $ \(state, subscribers) -> return (state, HM.delete key subscribers) disposeFn key disposeCallback = do
modifyMVar_ mvar (\(state, subscribers) -> return (state, HM.delete key subscribers))
disposeCallback
instance IsSettable v (ObservableVar v) where instance IsSettable v (ObservableVar v) where
setValue (ObservableVar mvar) value = modifyMVar_ mvar $ \(_, subscribers) -> do setValue (ObservableVar mvar) value = modifyMVar_ mvar $ \(_, subscribers) -> do
...@@ -202,17 +251,17 @@ instance forall o i v. (IsGettable i o, IsGettable v i) => IsGettable v (JoinedO ...@@ -202,17 +251,17 @@ instance forall o i v. (IsGettable i o, IsGettable v i) => IsGettable v (JoinedO
instance forall o i v. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where instance forall o i v. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where
subscribe :: (JoinedObservable o) -> (ObservableMessage v -> IO ()) -> IO SubscriptionHandle subscribe :: (JoinedObservable o) -> (ObservableMessage v -> IO ()) -> IO SubscriptionHandle
subscribe (JoinedObservable outer) callback = do subscribe (JoinedObservable outer) callback = do
innerSubscriptionMVar <- newMVar dummySubscription innerSubscriptionMVar <- newMVar DummyDisposable
outerSubscription <- subscribe outer (outerCallback innerSubscriptionMVar) outerSubscription <- subscribe outer (outerCallback innerSubscriptionMVar)
return $ SubscriptionHandle{unsubscribe = unsubscribe outerSubscription >> readMVar innerSubscriptionMVar >>= dispose} return $ FunctionDisposable (\disposeCallback -> dispose' outerSubscription (readMVar innerSubscriptionMVar >>= \innerSubscription -> dispose' innerSubscription disposeCallback))
where where
dummySubscription = SubscriptionHandle { unsubscribe = return () } outerCallback innerSubscriptionMVar = outerCallback'
outerCallback innerSubscriptionMVar = outerSubscription'
where where
outerSubscription' (_, inner) = do outerCallback' (_reason, innerObservable) = do
unsubscribe =<< takeMVar innerSubscriptionMVar oldInnerSubscription <- takeMVar innerSubscriptionMVar
innerSubscription <- subscribe inner callback dispose' oldInnerSubscription $ do
putMVar innerSubscriptionMVar innerSubscription newInnerSubscription <- subscribe innerObservable callback
putMVar innerSubscriptionMVar newInnerSubscription
joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v
joinObservable = Observable . JoinedObservable joinObservable = Observable . JoinedObservable
...@@ -243,7 +292,7 @@ instance forall o0 v0 o1 v1 r. (IsObservable v0 o0, IsObservable v1 o1) => IsObs ...@@ -243,7 +292,7 @@ instance forall o0 v0 o1 v1 r. (IsObservable v0 o0, IsObservable v1 o1) => IsObs
currentValuesTupleRef <- newIORef (Nothing, Nothing) currentValuesTupleRef <- newIORef (Nothing, Nothing)
sub0 <- subscribe obs0 (mergeCallback currentValuesTupleRef . fmap Left) sub0 <- subscribe obs0 (mergeCallback currentValuesTupleRef . fmap Left)
sub1 <- subscribe obs1 (mergeCallback currentValuesTupleRef . fmap Right) sub1 <- subscribe obs1 (mergeCallback currentValuesTupleRef . fmap Right)
return $ SubscriptionHandle{unsubscribe = unsubscribe sub0 >> unsubscribe sub1} return $ MultiDisposable [sub0, sub1]
where where
mergeCallback :: IORef (Maybe v0, Maybe v1) -> (MessageReason, Either v0 v1) -> IO () mergeCallback :: IORef (Maybe v0, Maybe v1) -> (MessageReason, Either v0 v1) -> IO ()
mergeCallback currentValuesTupleRef (reason, state) = do mergeCallback currentValuesTupleRef (reason, state) = do
...@@ -288,7 +337,7 @@ instance IsGettable a (ConstObservable a) where ...@@ -288,7 +337,7 @@ instance IsGettable a (ConstObservable a) where
instance IsObservable a (ConstObservable a) where instance IsObservable a (ConstObservable a) where
subscribe (ConstObservable x) callback = do subscribe (ConstObservable x) callback = do
callback (Current, x) callback (Current, x)
return $ SubscriptionHandle { unsubscribe = return () } return DummyDisposable
-- | Create an observable that contains a constant value. -- | Create an observable that contains a constant value.
constObservable :: a -> Observable a constObservable :: a -> Observable a
constObservable = Observable . ConstObservable constObservable = Observable . ConstObservable
......
...@@ -48,9 +48,11 @@ instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where ...@@ -48,9 +48,11 @@ instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where
callback (Current, toHashMap handle) callback (Current, toHashMap handle)
unique <- newUnique unique <- newUnique
let handle' = handle & set (_subscribers . at unique) (Just callback) let handle' = handle & set (_subscribers . at unique) (Just callback)
return (handle', SubscriptionHandle $ unsubscribe unique) return (handle', FunctionDisposable $ unsubscribe unique)
unsubscribe :: Unique -> IO () unsubscribe :: Unique -> IO () -> IO ()
unsubscribe unique = modifyHandle_ (return . set (_subscribers . at unique) Nothing) ohm unsubscribe unique unsubscribedCallback = do
modifyHandle_ (return . set (_subscribers . at unique) Nothing) ohm
unsubscribedCallback
instance IsDeltaObservable k v (ObservableHashMap k v) where instance IsDeltaObservable k v (ObservableHashMap k v) where
subscribeDelta ohm callback = modifyHandle update ohm subscribeDelta ohm callback = modifyHandle update ohm
...@@ -60,13 +62,11 @@ instance IsDeltaObservable k v (ObservableHashMap k v) where ...@@ -60,13 +62,11 @@ instance IsDeltaObservable k v (ObservableHashMap k v) where
callback (Reset $ toHashMap handle) callback (Reset $ toHashMap handle)
unique <- newUnique unique <- newUnique
let handle' = handle & set (_deltaSubscribers . at unique) (Just callback) let handle' = handle & set (_deltaSubscribers . at unique) (Just callback)
return (handle', SubscriptionHandle $ unsubscribe unique) return (handle', FunctionDisposable $ unsubscribe unique)
unsubscribe :: Unique -> IO () unsubscribe :: Unique -> IO () -> IO ()
unsubscribe unique = modifyHandle_ (return . set (_deltaSubscribers . at unique) Nothing) ohm unsubscribe unique unsubscribedCallback = do
modifyHandle_ (return . set (_deltaSubscribers . at unique) Nothing) ohm
-- TODO unsubscribedCallback
--subscribeAbstraction :: SomeIndexedLens -> (a -> v) -> (IO (a, r) -> IO r) -> (v -> IO ()) -> IO r
--subscribeAbstraction setter getCurrent modifyMVar callback = modify $ do
toHashMap :: Handle k v -> HM.HashMap k v toHashMap :: Handle k v -> HM.HashMap k v
...@@ -128,14 +128,16 @@ observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{getValueFn ...@@ -128,14 +128,16 @@ observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{getValueFn
subscribeFn callback = do subscribeFn callback = do
subscriptionKey <- newUnique subscriptionKey <- newUnique
modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm
return $ SubscriptionHandle $ unsubscribe subscriptionKey return $ FunctionDisposable $ unsubscribe subscriptionKey
where where
subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v) subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v)
subscribeFn' subKey keyHandle@KeyHandle{value} = do subscribeFn' subKey keyHandle@KeyHandle{value} = do
callback (Current, value) callback (Current, value)
return $ modifyKeySubscribers (HM.insert subKey callback) keyHandle return $ modifyKeySubscribers (HM.insert subKey callback) keyHandle
unsubscribe :: Unique -> IO () unsubscribe :: Unique -> IO () -> IO ()
unsubscribe subKey = modifyKeyHandle_ (return . modifyKeySubscribers (HM.delete subKey)) key ohm unsubscribe subKey unsubscribedCallback = do
modifyKeyHandle_ (return . modifyKeySubscribers (HM.delete subKey)) key ohm
unsubscribedCallback
insert :: forall k v. (Eq k, Hashable k) => k -> v -> ObservableHashMap k v -> IO () insert :: forall k v. (Eq k, Hashable k) => k -> v -> ObservableHashMap k v -> IO ()
insert key value = modifyKeyHandleNotifying_ fn key insert key value = modifyKeyHandleNotifying_ fn key
......
...@@ -33,10 +33,12 @@ instance IsObservable (Maybe v) (ObservablePriority p v) where ...@@ -33,10 +33,12 @@ instance IsObservable (Maybe v) (ObservablePriority p v) where
-- Call listener -- Call listener
callback (Current, currentValue internals) callback (Current, currentValue internals)
return internals{subscribers = HM.insert key callback subscribers} return internals{subscribers = HM.insert key callback subscribers}
return $ SubscriptionHandle $ unsubscribe' key return $ FunctionDisposable (unsubscribe key)
where where
unsubscribe' :: Unique -> IO () unsubscribe :: Unique -> IO () -> IO ()
unsubscribe' key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> return internals{subscribers=HM.delete key subscribers} unsubscribe key disposeCallback = do
modifyMVar_ mvar $ \internals@Internals{subscribers} -> return internals{subscribers=HM.delete key subscribers}
disposeCallback
type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v)) type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v))
...@@ -63,7 +65,7 @@ insertValue :: forall p v. (Ord p, Hashable p) => ObservablePriority p v -> p -> ...@@ -63,7 +65,7 @@ insertValue :: forall p v. (Ord p, Hashable p) => ObservablePriority p v -> p ->
insertValue (ObservablePriority mvar) priority value = modifyMVar mvar $ \internals -> do insertValue (ObservablePriority mvar) priority value = modifyMVar mvar $ \internals -> do
key <- newUnique key <- newUnique
newInternals <- insertValue' key internals newInternals <- insertValue' key internals
return (newInternals, RegistrationHandle {deregister=removeValue key}) return (newInternals, FunctionDisposable (\callback -> removeValue key >> callback))
where where
insertValue' :: Unique -> Internals p v -> IO (Internals p v) insertValue' :: Unique -> Internals p v -> IO (Internals p v)
insertValue' key internals@Internals{priorityMap, current} insertValue' key internals@Internals{priorityMap, current}
......
...@@ -21,9 +21,9 @@ spec = do ...@@ -21,9 +21,9 @@ spec = do
getValue op `shouldReturn` (Just "p2") getValue op `shouldReturn` (Just "p2")
p1 <- OP.insertValue op 1 "p1" p1 <- OP.insertValue op 1 "p1"
getValue op `shouldReturn` (Just "p2") getValue op `shouldReturn` (Just "p2")
deregister p2 dispose p2
getValue op `shouldReturn` (Just "p1") getValue op `shouldReturn` (Just "p1")
deregister p1 dispose p1
getValue op `shouldReturn` (Nothing) getValue op `shouldReturn` (Nothing)
it "sends updates when its value changes" $ do it "sends updates when its value changes" $ do
result <- newIORef [] result <- newIORef []
...@@ -37,9 +37,9 @@ spec = do ...@@ -37,9 +37,9 @@ spec = do
mostRecentShouldBe (Update, Just "p2") mostRecentShouldBe (Update, Just "p2")
p1 <- OP.insertValue op 1 "p1" p1 <- OP.insertValue op 1 "p1"
mostRecentShouldBe (Update, Just "p2") mostRecentShouldBe (Update, Just "p2")
deregister p2 dispose p2
mostRecentShouldBe (Update, Just "p1") mostRecentShouldBe (Update, Just "p1")
deregister p1 dispose p1
mostRecentShouldBe (Update, Nothing) mostRecentShouldBe (Update, Nothing)
length <$> readIORef result `shouldReturn` 4 length <$> readIORef result `shouldReturn` 4
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment