diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 4a4f812b4d1ed9aedde0b7f71bfb7589183bf67f..b14c79def08eb30eba2e904cd8cea2533c3c2535 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -1,64 +1,81 @@ {-# LANGUAGE UndecidableInstances #-} module Quasar.Observable ( - Observable(..), + -- * Observable core types IsRetrievable(..), retrieveIO, IsObservable(..), - unsafeGetBlocking, - subscribe', - IsSettable(..), - ObservableCallback, - ObservableMessage, - MessageReason(..), + Observable(..), + ObservableMessage(..), + + -- * ObservableVar ObservableVar, newObservableVar, + setObservableVar, withObservableVar, modifyObservableVar, modifyObservableVar_, - bindObservable, + + -- * Helper functions + mergeObservable, + mergeObservableMaybe, joinObservable, joinObservableMaybe, joinObservableMaybe', joinObservableEither, joinObservableEither', - mergeObservable, - mergeObservableMaybe, - constObservable, + bindObservable, + + -- * Helper types FnObservable(..), + ObservableCallback, ) where import Control.Concurrent.MVar +import Control.Concurrent.STM import Control.Monad.Except import Control.Monad.Trans.Maybe -import Data.Binary (Binary) import Data.HashMap.Strict qualified as HM import Data.IORef import Data.Unique +import Quasar.Awaitable import Quasar.Core import Quasar.Disposable import Quasar.Prelude -data MessageReason = Current | Update - deriving stock (Eq, Show, Generic) -instance Binary MessageReason +data ObservableMessage a + = ObservableUpdate a + | ObservableConnecting + | ObservableReconnecting SomeException + | ObservableNotAvailable SomeException + deriving stock (Show, Generic) -type ObservableMessage v = (MessageReason, v) +instance Functor ObservableMessage where + fmap fn (ObservableUpdate x) = ObservableUpdate (fn x) + fmap _ ObservableConnecting = ObservableConnecting + fmap _ (ObservableReconnecting ex) = ObservableReconnecting ex + fmap _ (ObservableNotAvailable ex) = ObservableNotAvailable ex -mapObservableMessage :: (a -> b) -> ObservableMessage a -> ObservableMessage b -mapObservableMessage f (reason, x) = (reason, f x) +instance Applicative ObservableMessage where + pure = ObservableUpdate + liftA2 _ (ObservableNotAvailable ex) _ = ObservableNotAvailable ex + liftA2 _ _ (ObservableNotAvailable ex) = ObservableNotAvailable ex + liftA2 _ (ObservableReconnecting ex) _ = ObservableReconnecting ex + liftA2 _ _ (ObservableReconnecting ex) = ObservableReconnecting ex + liftA2 _ ObservableConnecting _ = ObservableConnecting + liftA2 _ _ ObservableConnecting = ObservableConnecting + liftA2 fn (ObservableUpdate x) (ObservableUpdate y) = ObservableUpdate (fn x y) class IsRetrievable v a | a -> v where - retrieve :: a -> AsyncIO v + retrieve :: HasResourceManager m => a -> m (AsyncTask v) retrieveIO :: IsRetrievable v a => a -> IO v -retrieveIO = runAsyncIO . retrieve - +retrieveIO x = awaitIO =<< withDefaultResourceManager (retrieve x) class IsRetrievable v o => IsObservable v o | o -> v where - subscribe :: o -> (ObservableMessage v -> IO ()) -> IO Disposable + observe :: o -> (ObservableMessage v -> IO ()) -> IO Disposable toObservable :: o -> Observable v toObservable = Observable @@ -66,30 +83,23 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f --- | Variant of `retrieveIO` that throws exceptions instead of returning them. -unsafeGetBlocking :: (Exception e, IsObservable (Either e v) o) => o -> IO v -unsafeGetBlocking = either throwIO pure <=< retrieveIO - --- | A variant of `subscribe` that passes the `Disposable` to the callback. -subscribe' :: IsObservable v o => o -> (Disposable -> ObservableMessage v -> IO ()) -> IO Disposable -subscribe' observable callback = mfix $ \subscription -> subscribe observable (callback subscription) +-- TODO needs a name +-- | A variant of `observe` that passes the `Disposable` to the callback. +observeWithDisposablePassedToTheCallback :: IsObservable v o => o -> (Disposable -> ObservableMessage v -> IO ()) -> IO Disposable +observeWithDisposablePassedToTheCallback observable callback = mfix $ \disposable -> observe observable (callback disposable) type ObservableCallback v = ObservableMessage v -> IO () instance IsRetrievable v o => IsRetrievable v (IO o) where - retrieve :: IO o -> AsyncIO v + retrieve :: HasResourceManager m => IO o -> m (AsyncTask v) retrieve = retrieve <=< liftIO instance IsObservable v o => IsObservable v (IO o) where - subscribe :: IO o -> (ObservableMessage v -> IO ()) -> IO Disposable - subscribe getObservable callback = do + observe :: IO o -> (ObservableMessage v -> IO ()) -> IO Disposable + observe getObservable callback = do observable <- getObservable - subscribe observable callback - - -class IsSettable v a | a -> v where - setValue :: a -> v -> IO () + observe observable callback -- | Existential quantification wrapper for the IsObservable type class. @@ -97,71 +107,67 @@ data Observable v = forall o. IsObservable v o => Observable o instance IsRetrievable v (Observable v) where retrieve (Observable o) = retrieve o instance IsObservable v (Observable v) where - subscribe (Observable o) = subscribe o + observe (Observable o) = observe o toObservable = id mapObservable f (Observable o) = mapObservable f o instance Functor Observable where fmap f = mapObservable f - x <$ _ = constObservable x instance Applicative Observable where pure = constObservable liftA2 = mergeObservable - _ *> x = x - x <* _ = x instance Monad Observable where (>>=) = bindObservable data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o instance IsRetrievable v (MappedObservable v) where - retrieve (MappedObservable f observable) = f <$> retrieve observable + retrieve (MappedObservable f observable) = f <<$>> retrieve observable instance IsObservable v (MappedObservable v) where - subscribe (MappedObservable f observable) callback = subscribe observable (callback . mapObservableMessage f) + observe (MappedObservable f observable) callback = observe observable (callback . fmap f) mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v))) instance IsRetrievable v (ObservableVar v) where - retrieve (ObservableVar mvar) = liftIO $ fst <$> readMVar mvar + retrieve (ObservableVar mvar) = liftIO $ successfulTask . fst <$> readMVar mvar instance IsObservable v (ObservableVar v) where - subscribe (ObservableVar mvar) callback = do + observe (ObservableVar mvar) callback = do key <- newUnique modifyMVar_ mvar $ \(state, subscribers) -> do -- Call listener - callback (Current, state) + callback (pure state) pure (state, HM.insert key callback subscribers) pure $ synchronousDisposable (disposeFn key) where disposeFn :: Unique -> IO () disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers)) -instance IsSettable v (ObservableVar v) where - setValue (ObservableVar mvar) value = modifyMVar_ mvar $ \(_, subscribers) -> do - mapM_ (\callback -> callback (Update, value)) subscribers - pure (value, subscribers) - - newObservableVar :: v -> IO (ObservableVar v) newObservableVar initialValue = do ObservableVar <$> newMVar (initialValue, HM.empty) +setObservableVar :: ObservableVar v -> v -> IO () +setObservableVar (ObservableVar mvar) value = modifyMVar_ mvar $ \(_, subscribers) -> do + mapM_ (\callback -> callback (pure value)) subscribers + pure (value, subscribers) + modifyObservableVar :: ObservableVar v -> (v -> IO (v, a)) -> IO a modifyObservableVar (ObservableVar mvar) f = modifyMVar mvar $ \(oldState, subscribers) -> do (newState, result) <- f oldState - mapM_ (\callback -> callback (Update, newState)) subscribers + mapM_ (\callback -> callback (pure newState)) subscribers pure ((newState, subscribers), result) modifyObservableVar_ :: ObservableVar v -> (v -> IO v) -> IO () modifyObservableVar_ (ObservableVar mvar) f = modifyMVar_ mvar $ \(oldState, subscribers) -> do newState <- f oldState - mapM_ (\callback -> callback (Update, newState)) subscribers + mapM_ (\callback -> callback (pure newState)) subscribers pure (newState, subscribers) -withObservableVar :: ObservableVar a -> (a -> IO b) -> IO b +withObservableVar :: ObservableVar v -> (v -> IO a) -> IO a withObservableVar (ObservableVar mvar) f = withMVar mvar (f . fst) @@ -171,32 +177,37 @@ bindObservable x fy = joinObservable $ mapObservable fy x newtype JoinedObservable o = JoinedObservable o -instance forall o i v. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v (JoinedObservable o) where - retrieve :: JoinedObservable o -> AsyncIO v - retrieve (JoinedObservable outer) = retrieve =<< retrieve outer -instance forall o i v. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where - subscribe :: (JoinedObservable o) -> (ObservableMessage v -> IO ()) -> IO Disposable - subscribe (JoinedObservable outer) callback = do +instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v (JoinedObservable o) where + retrieve :: HasResourceManager m => JoinedObservable o -> m (AsyncTask v) + retrieve (JoinedObservable outer) = async $ await =<< retrieve =<< await =<< retrieve outer +instance forall v o i. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where + observe :: JoinedObservable o -> (ObservableMessage v -> IO ()) -> IO Disposable + observe (JoinedObservable outer) callback = do -- TODO: rewrite with latest semantics -- the current implementation blocks the callback while `dispose` is running - innerDisposableMVar <- newMVar noDisposable - outerDisposable <- subscribe outer (outerCallback innerDisposableMVar) + innerDisposableMVar <- newMVar Nothing + outerDisposable <- observe outer (outerCallback innerDisposableMVar) pure $ mkDisposable $ do dispose outerDisposable - dispose =<< liftIO (readMVar innerDisposableMVar) + mapM_ dispose =<< liftIO (readMVar innerDisposableMVar) where - outerCallback :: MVar Disposable -> ObservableMessage i -> IO () - outerCallback innerDisposableMVar (_reason, innerObservable) = do - oldInnerSubscription <- takeMVar innerDisposableMVar - disposeIO oldInnerSubscription - newInnerSubscription <- subscribe innerObservable callback - putMVar innerDisposableMVar newInnerSubscription + outerCallback :: MVar (Maybe Disposable) -> ObservableMessage i -> IO () + outerCallback innerDisposableMVar message = do + oldInnerDisposable <- takeMVar innerDisposableMVar + mapM_ disposeIO oldInnerDisposable + newInnerDisposable <- outerCallbackObserve message + putMVar innerDisposableMVar newInnerDisposable + outerCallbackObserve :: ObservableMessage i -> IO (Maybe Disposable) + outerCallbackObserve (ObservableUpdate innerObservable) = Just <$> observe innerObservable callback + outerCallbackObserve ObservableConnecting = Nothing <$ callback ObservableConnecting + outerCallbackObserve (ObservableReconnecting ex) = Nothing <$ callback (ObservableReconnecting ex) + outerCallbackObserve (ObservableNotAvailable ex) = Nothing <$ callback (ObservableNotAvailable ex) joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v joinObservable = Observable . JoinedObservable -joinObservableMaybe :: forall o i v. (IsObservable (Maybe i) o, IsObservable v i) => o -> Observable (Maybe v) +joinObservableMaybe :: forall v o i. (IsObservable (Maybe i) o, IsObservable v i) => o -> Observable (Maybe v) joinObservableMaybe = runMaybeT . join . fmap (MaybeT . fmap Just . toObservable) . MaybeT . toObservable joinObservableMaybe' :: (IsObservable (Maybe i) o, IsObservable (Maybe v) i) => o -> Observable (Maybe v) @@ -210,28 +221,28 @@ joinObservableEither' :: (IsObservable (Either e i) o, IsObservable (Either e v) joinObservableEither' = runExceptT . join . fmap (ExceptT . toObservable) . ExceptT . toObservable -data MergedObservable o0 v0 o1 v1 r = MergedObservable (v0 -> v1 -> r) o0 o1 -instance forall o0 v0 o1 v1 r. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable o0 v0 o1 v1 r) where - retrieve (MergedObservable merge obs0 obs1) = merge <$> retrieve obs0 <*> retrieve obs1 -instance forall o0 v0 o1 v1 r. (IsObservable v0 o0, IsObservable v1 o1) => IsObservable r (MergedObservable o0 v0 o1 v1 r) where - subscribe (MergedObservable merge obs0 obs1) callback = do - currentValuesTupleRef <- newIORef (Nothing, Nothing) - sub0 <- subscribe obs0 (mergeCallback currentValuesTupleRef . fmap Left) - sub1 <- subscribe obs1 (mergeCallback currentValuesTupleRef . fmap Right) - pure $ mconcat [sub0, sub1] +data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1 +instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where + retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1) +instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObservable r (MergedObservable r o0 v0 o1 v1) where + observe (MergedObservable merge obs0 obs1) callback = do + var0 <- newTVarIO Nothing + var1 <- newTVarIO Nothing + d0 <- observe obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) + d1 <- observe obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) + pure $ mconcat [d0, d1] where - mergeCallback :: IORef (Maybe v0, Maybe v1) -> (MessageReason, Either v0 v1) -> IO () - mergeCallback currentValuesTupleRef (reason, state) = do - currentTuple <- atomicModifyIORef' currentValuesTupleRef ((\x -> (x, x)) . updateTuple state) - case currentTuple of - (Just l, Just r) -> callback (reason, uncurry merge (l, r)) - _ -> pure () -- Start only once both values have been received - updateTuple :: Either v0 v1 -> (Maybe v0, Maybe v1) -> (Maybe v0, Maybe v1) - updateTuple (Left l) (_, r) = (Just l, r) - updateTuple (Right r) (l, _) = (l, Just r) - - --- | Merge two observables using a given merge function. Whenever the value of one of the inputs changes, the resulting observable updates according to the merge function. + mergeCallback :: TVar (Maybe (ObservableMessage v0)) -> TVar (Maybe (ObservableMessage v1)) -> STM () -> IO () + mergeCallback var0 var1 update = do + mMerged <- atomically $ do + update + runMaybeT $ liftA2 (liftA2 merge) (MaybeT (readTVar var0)) (MaybeT (readTVar var1)) + + -- Run the callback only once both values have been received + mapM_ callback mMerged + + +-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting observable updates according to the merge function. -- -- There is no caching involed, every subscriber effectively subscribes to both input observables. mergeObservable :: (IsObservable v0 o0, IsObservable v1 o1) => (v0 -> v1 -> r) -> o0 -> o1 -> Observable r @@ -244,29 +255,29 @@ mergeObservableMaybe merge x y = Observable $ MergedObservable (liftA2 merge) x -- | Data type that can be used as an implementation for the `IsObservable` interface that works by directly providing functions for `retrieve` and `subscribe`. data FnObservable v = FnObservable { - getValueFn :: AsyncIO v, - subscribeFn :: (ObservableMessage v -> IO ()) -> IO Disposable + retrieveFn :: IO v, + observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable } instance IsRetrievable v (FnObservable v) where - retrieve o = getValueFn o + retrieve o = liftIO $ successfulTask <$> retrieveFn o instance IsObservable v (FnObservable v) where - subscribe o = subscribeFn o - mapObservable f FnObservable{getValueFn, subscribeFn} = Observable $ FnObservable { - getValueFn = f <$> getValueFn, - subscribeFn = \listener -> subscribeFn (listener . mapObservableMessage f) + observe o = observeFn o + mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable { + retrieveFn = f <$> retrieveFn, + observeFn = \listener -> observeFn (listener . fmap f) } -newtype ConstObservable a = ConstObservable a -instance IsRetrievable a (ConstObservable a) where - retrieve (ConstObservable x) = pure x +newtype ConstObservable v = ConstObservable v +instance IsRetrievable v (ConstObservable v) where + retrieve (ConstObservable x) = pure $ pure x instance IsObservable a (ConstObservable a) where - subscribe (ConstObservable x) callback = do - callback (Current, x) + observe (ConstObservable x) callback = do + callback $ ObservableUpdate x pure noDisposable -- | Create an observable that contains a constant value. -constObservable :: a -> Observable a +constObservable :: v -> Observable v constObservable = Observable . ConstObservable diff --git a/src/Quasar/Observable/Delta.hs b/src/Quasar/Observable/Delta.hs index 3adb77d2ae326beb603b5dcbcb6b922833d7f4c4..8c099fa1e93eb3a1e08c2ba34956f0062c17116b 100644 --- a/src/Quasar/Observable/Delta.hs +++ b/src/Quasar/Observable/Delta.hs @@ -49,7 +49,7 @@ data DeltaObservable k v = forall o. IsDeltaObservable k v o => DeltaObservable instance IsRetrievable (HM.HashMap k v) (DeltaObservable k v) where retrieve (DeltaObservable o) = retrieve o instance IsObservable (HM.HashMap k v) (DeltaObservable k v) where - subscribe (DeltaObservable o) = subscribe o + observe (DeltaObservable o) = observe o instance IsDeltaObservable k v (DeltaObservable k v) where subscribeDelta (DeltaObservable o) = subscribeDelta o instance Functor (DeltaObservable k) where @@ -58,8 +58,8 @@ instance Functor (DeltaObservable k) where data MappedDeltaObservable k b = forall a o. IsDeltaObservable k a o => MappedDeltaObservable (a -> b) o instance IsRetrievable (HM.HashMap k b) (MappedDeltaObservable k b) where - retrieve (MappedDeltaObservable f o) = f <<$>> retrieve o + retrieve (MappedDeltaObservable f o) = fmap f <<$>> retrieve o instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where - subscribe (MappedDeltaObservable f o) callback = subscribe o (callback . fmap (fmap f)) + observe (MappedDeltaObservable f o) callback = observe o (callback . fmap (fmap f)) instance IsDeltaObservable k b (MappedDeltaObservable k b) where subscribeDelta (MappedDeltaObservable f o) callback = subscribeDelta o (callback . fmap f) diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index a15712897232ab40a14edb81abf22933531372ea..3b12db267c7f61a2cf7481ca727f90e299e4c3d1 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -14,7 +14,7 @@ import Data.HashMap.Strict qualified as HM import Data.Maybe (isJust) import Language.Haskell.TH.Syntax (mkName, nameBase) import Lens.Micro.Platform -import Quasar.Core +import Quasar.Awaitable import Quasar.Disposable import Quasar.Observable import Quasar.Observable.Delta @@ -38,13 +38,13 @@ makeLensesWith (lensField .~ (\_ _ -> pure . TopName . mkName . ("_" <>) . nameB makeLensesWith (lensField .~ (\_ _ -> pure . TopName . mkName . ("_" <>) . nameBase) $ lensRules) ''KeyHandle instance IsRetrievable (HM.HashMap k v) (ObservableHashMap k v) where - retrieve (ObservableHashMap mvar) = liftIO $ HM.mapMaybe value . keyHandles <$> readMVar mvar + retrieve (ObservableHashMap mvar) = liftIO $ pure . HM.mapMaybe value . keyHandles <$> readMVar mvar instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where - subscribe ohm callback = modifyHandle update ohm + observe ohm callback = liftIO $ modifyHandle update ohm where update :: Handle k v -> IO (Handle k v, Disposable) update handle = do - callback (Current, toHashMap handle) + callback $ pure $ toHashMap handle unique <- newUnique let handle' = handle & set (_subscribers . at unique) (Just callback) pure (handle', synchronousDisposable (unsubscribe unique)) @@ -106,7 +106,7 @@ notifySubscribers :: Handle k v -> Maybe (Delta k v) -> IO () notifySubscribers _ Nothing = pure () notifySubscribers handle@Handle{deltaSubscribers, subscribers} (Just delta) = do mapM_ ($ delta) $ HM.elems deltaSubscribers - mapM_ ($ (Update, toHashMap handle)) $ HM.elems subscribers + mapM_ ($ pure (toHashMap handle)) $ HM.elems subscribers modifyKeySubscribers :: (HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) -> HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ())) -> KeyHandle v -> KeyHandle v modifyKeySubscribers = over _keySubscribers @@ -115,19 +115,19 @@ new :: IO (ObservableHashMap k v) new = ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty} observeKey :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> Observable (Maybe v) -observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{getValueFn, subscribeFn} +observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{retrieveFn, observeFn} where - getValueFn :: AsyncIO (Maybe v) - getValueFn = liftIO $ join . preview (_keyHandles . at key . _Just . _value) <$> readMVar mvar - subscribeFn :: ((ObservableMessage (Maybe v) -> IO ()) -> IO Disposable) - subscribeFn callback = do + retrieveFn :: IO (Maybe v) + retrieveFn = liftIO $ join . preview (_keyHandles . at key . _Just . _value) <$> readMVar mvar + observeFn :: ((ObservableMessage (Maybe v) -> IO ()) -> IO Disposable) + observeFn callback = do subscriptionKey <- newUnique modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm pure $ synchronousDisposable (unsubscribe subscriptionKey) where subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v) subscribeFn' subKey keyHandle@KeyHandle{value} = do - callback (Current, value) + callback $ pure value pure $ modifyKeySubscribers (HM.insert subKey callback) keyHandle unsubscribe :: Unique -> IO () unsubscribe subKey = modifyKeyHandle_ (pure . modifyKeySubscribers (HM.delete subKey)) key ohm @@ -137,7 +137,7 @@ insert key value = modifyKeyHandleNotifying_ fn key where fn :: KeyHandle v -> IO (KeyHandle v, Maybe (Delta k v)) fn keyHandle@KeyHandle{keySubscribers} = do - mapM_ ($ (Update, Just value)) $ HM.elems keySubscribers + mapM_ ($ pure $ Just value) $ HM.elems keySubscribers pure (keyHandle{value=Just value}, Just (Insert key value)) delete :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> IO () @@ -145,7 +145,7 @@ delete key = modifyKeyHandleNotifying_ fn key where fn :: KeyHandle v -> IO (KeyHandle v, Maybe (Delta k v)) fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do - mapM_ ($ (Update, Nothing)) $ HM.elems keySubscribers + mapM_ ($ pure $ Nothing) $ HM.elems keySubscribers let delta = if isJust oldValue then Just (Delete key) else Nothing pure (keyHandle{value=Nothing}, delta) @@ -159,6 +159,6 @@ lookupDelete key = modifyKeyHandleNotifying fn key where fn :: KeyHandle v -> IO (KeyHandle v, (Maybe (Delta k v), Maybe v)) fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do - mapM_ ($ (Update, Nothing)) $ HM.elems keySubscribers + mapM_ ($ pure $ Nothing) $ HM.elems keySubscribers let delta = if isJust oldValue then Just (Delete key) else Nothing pure (keyHandle{value=Nothing}, (delta, oldValue)) diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index 9184e2e6a368afa3cbbe539a9b2d8046ca89c320..53aa6f7432b305c1a0fcace59cd9da20e76424aa 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -19,17 +19,17 @@ type Entry v = (Unique, v) newtype ObservablePriority p v = ObservablePriority (MVar (Internals p v)) instance IsRetrievable (Maybe v) (ObservablePriority p v) where - retrieve (ObservablePriority mvar) = liftIO $ getValueFromInternals <$> readMVar mvar + retrieve (ObservablePriority mvar) = liftIO $ pure . getValueFromInternals <$> readMVar mvar where getValueFromInternals :: Internals p v -> Maybe v getValueFromInternals Internals{current=Nothing} = Nothing getValueFromInternals Internals{current=Just (_, _, value)} = Just value instance IsObservable (Maybe v) (ObservablePriority p v) where - subscribe (ObservablePriority mvar) callback = do + observe (ObservablePriority mvar) callback = do key <- newUnique modifyMVar_ mvar $ \internals@Internals{subscribers} -> do -- Call listener - callback (Current, currentValue internals) + callback (pure (currentValue internals)) pure internals{subscribers = HM.insert key callback subscribers} pure $ synchronousDisposable (unsubscribe key) where @@ -119,4 +119,4 @@ insertValue (ObservablePriority mvar) priority value = modifyMVar mvar $ \intern notifySubscribers :: forall p v. Internals p v -> IO () -notifySubscribers Internals{subscribers, current} = forM_ subscribers (\callback -> callback (Update, (\(_, _, value) -> value) <$> current)) +notifySubscribers Internals{subscribers, current} = forM_ subscribers (\callback -> callback (pure ((\(_, _, value) -> value) <$> current))) diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index ae526ef4dfeb815e0afc01a8165b91063c9eb1f3..7f808db8642090854858dc5cae81bc4a497394be 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -1,11 +1,12 @@ module Quasar.Observable.ObservableHashMapSpec (spec) where +import Quasar.Awaitable import Quasar.Disposable import Quasar.Observable import Quasar.Observable.Delta import Quasar.Observable.ObservableHashMap qualified as OM -import Control.Monad (void) +import Control.Monad ((<=<), void) import Data.HashMap.Strict qualified as HM import Data.IORef import Prelude @@ -28,20 +29,22 @@ spec = parallel $ do lastCallbackValue <- newIORef undefined om <- OM.new :: IO (OM.ObservableHashMap String String) - subscriptionHandle <- subscribe om $ writeIORef lastCallbackValue - let lastCallbackShouldBe = (readIORef lastCallbackValue `shouldReturn`) + subscriptionHandle <- observe om $ writeIORef lastCallbackValue + let lastCallbackShouldBe expected = do + (ObservableUpdate update) <- readIORef lastCallbackValue + update `shouldBe` expected - lastCallbackShouldBe (Current, HM.empty) + lastCallbackShouldBe HM.empty OM.insert "key" "value" om - lastCallbackShouldBe (Update, HM.singleton "key" "value") + lastCallbackShouldBe (HM.singleton "key" "value") OM.insert "key2" "value2" om - lastCallbackShouldBe (Update, HM.fromList [("key", "value"), ("key2", "value2")]) + lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) disposeIO subscriptionHandle - lastCallbackShouldBe (Update, HM.fromList [("key", "value"), ("key2", "value2")]) + lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) OM.insert "key3" "value3" om - lastCallbackShouldBe (Update, HM.fromList [("key", "value"), ("key2", "value2")]) + lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) describe "subscribeDelta" $ do it "calls the callback with changes to the map" $ do @@ -83,45 +86,49 @@ spec = parallel $ do om <- OM.new :: IO (OM.ObservableHashMap String String) - void $ subscribe (OM.observeKey "key1" om) (writeIORef value1) - let v1ShouldBe = (readIORef value1 `shouldReturn`) + void $ observe (OM.observeKey "key1" om) (writeIORef value1) + let v1ShouldBe expected = do + (ObservableUpdate update) <- readIORef value1 + update `shouldBe` expected - v1ShouldBe $ (Current, Nothing) + v1ShouldBe $ Nothing OM.insert "key1" "value1" om - v1ShouldBe $ (Update, Just "value1") + v1ShouldBe $ Just "value1" OM.insert "key2" "value2" om - v1ShouldBe $ (Update, Just "value1") + v1ShouldBe $ Just "value1" - handle2 <- subscribe (OM.observeKey "key2" om) (writeIORef value2) - let v2ShouldBe = (readIORef value2 `shouldReturn`) + handle2 <- observe (OM.observeKey "key2" om) (writeIORef value2) + let v2ShouldBe expected = do + (ObservableUpdate update) <- readIORef value2 + update `shouldBe` expected - v1ShouldBe $ (Update, Just "value1") - v2ShouldBe $ (Current, Just "value2") + v1ShouldBe $ Just "value1" + v2ShouldBe $ Just "value2" OM.insert "key2" "changed" om - v1ShouldBe $ (Update, Just "value1") - v2ShouldBe $ (Update, Just "changed") + v1ShouldBe $ Just "value1" + v2ShouldBe $ Just "changed" OM.delete "key1" om - v1ShouldBe $ (Update, Nothing) - v2ShouldBe $ (Update, Just "changed") + v1ShouldBe $ Nothing + v2ShouldBe $ Just "changed" -- Delete again (should have no effect) OM.delete "key1" om - v1ShouldBe $ (Update, Nothing) - v2ShouldBe $ (Update, Just "changed") + v1ShouldBe $ Nothing + v2ShouldBe $ Just "changed" retrieveIO om `shouldReturn` HM.singleton "key2" "changed" disposeIO handle2 - OM.lookupDelete "key2" om `shouldReturn` (Just "changed") - v2ShouldBe $ (Update, Just "changed") + OM.lookupDelete "key2" om `shouldReturn` Just "changed" + v2ShouldBe $ Just "changed" OM.lookupDelete "key2" om `shouldReturn` Nothing OM.lookupDelete "key1" om `shouldReturn` Nothing - v1ShouldBe $ (Update, Nothing) + v1ShouldBe $ Nothing retrieveIO om `shouldReturn` HM.empty diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index f05d11a30fd3fefa75c80bbc51cadf83344b8b65..d500d0f7a132939cccd59c1d808d756c88d05f93 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -28,19 +28,21 @@ spec = do retrieveIO op `shouldReturn` Nothing it "sends updates when its value changes" $ do result <- newIORef [] - let mostRecentShouldBe = (head <$> readIORef result `shouldReturn`) + let mostRecentShouldBe expected = do + (ObservableUpdate x) <- (head <$> readIORef result) + x `shouldBe` expected (op :: ObservablePriority Int String) <- OP.create - _s <- subscribe op (modifyIORef result . (:)) - readIORef result `shouldReturn` [(Current, Nothing)] + _s <- observe op (modifyIORef result . (:)) + mostRecentShouldBe Nothing p2 <- OP.insertValue op 2 "p2" - mostRecentShouldBe (Update, Just "p2") + mostRecentShouldBe (Just "p2") p1 <- OP.insertValue op 1 "p1" - mostRecentShouldBe (Update, Just "p2") + mostRecentShouldBe (Just "p2") disposeIO p2 - mostRecentShouldBe (Update, Just "p1") + mostRecentShouldBe (Just "p1") disposeIO p1 - mostRecentShouldBe (Update, Nothing) + mostRecentShouldBe Nothing length <$> readIORef result `shouldReturn` 4 diff --git a/test/Quasar/ObservableSpec.hs b/test/Quasar/ObservableSpec.hs index 707a2887338a6f22950062343cd60546773bc826..01833938ac3e75c0d415864093ab19f8d6618718 100644 --- a/test/Quasar/ObservableSpec.hs +++ b/test/Quasar/ObservableSpec.hs @@ -24,14 +24,16 @@ mergeObservableSpec = do testSequence a b latestShouldBe - it "merges correctly using subscribe" $ do + it "merges correctly using observe" $ do a <- newObservableVar "" b <- newObservableVar "" - let mergedObservable = mergeObservable (\v0 v1 -> (v0, v1)) a b - (latestRef :: IORef (String, String)) <- newIORef ("", "") - void $ subscribe mergedObservable (writeIORef latestRef . snd) - let latestShouldBe = ((readIORef latestRef) `shouldReturn`) + let mergedObservable = mergeObservable (,) a b + (latestRef :: IORef (ObservableMessage (String, String))) <- newIORef (ObservableUpdate ("", "")) + void $ observe mergedObservable (writeIORef latestRef) + let latestShouldBe expected = do + (ObservableUpdate x) <- readIORef latestRef + x `shouldBe` expected testSequence a b latestShouldBe where @@ -39,18 +41,18 @@ mergeObservableSpec = do testSequence a b latestShouldBe = do latestShouldBe ("", "") - setValue a "a0" + setObservableVar a "a0" latestShouldBe ("a0", "") - setValue b "b0" + setObservableVar b "b0" latestShouldBe ("a0", "b0") - setValue a "a1" + setObservableVar a "a1" latestShouldBe ("a1", "b0") - setValue b "b1" + setObservableVar b "b1" latestShouldBe ("a1", "b1") -- No change - setValue a "a1" + setObservableVar a "a1" latestShouldBe ("a1", "b1")