From d9086e9d31c4b479ef41ad78cfaca442d4b76163 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Fri, 25 Feb 2022 17:52:57 +0100 Subject: [PATCH] Comment out old observable implementation (and tests) --- src/Quasar/Observable.hs | 824 +++++++++--------- src/Quasar/Observable/Delta.hs | 126 +-- src/Quasar/Observable/ObservableHashMap.hs | 308 +++---- src/Quasar/Observable/ObservablePriority.hs | 244 +++--- .../Observable/ObservableHashMapSpec.hs | 239 ++--- .../Observable/ObservablePrioritySpec.hs | 85 +- test/Quasar/ObservableSpec.hs | 119 +-- 7 files changed, 974 insertions(+), 971 deletions(-) diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 120051f..ef29c6b 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -1,418 +1,418 @@ {-# LANGUAGE ViewPatterns #-} module Quasar.Observable ( - -- * Observable core types - IsRetrievable(..), - IsObservable(..), - Observable(..), - ObservableMessage(..), - toObservableUpdate, - - -- * ObservableVar - ObservableVar, - newObservableVar, - setObservableVar, - modifyObservableVar, - stateObservableVar, - - -- * Helper functions - observeWhile, - observeWhile_, - observeBlocking, - fnObservable, - synchronousFnObservable, - - -- * Helper types - ObservableCallback, + ---- * Observable core types + --IsRetrievable(..), + --IsObservable(..), + --Observable(..), + --ObservableMessage(..), + --toObservableUpdate, + + ---- * ObservableVar + --ObservableVar, + --newObservableVar, + --setObservableVar, + --modifyObservableVar, + --stateObservableVar, + + ---- * Helper functions + --observeWhile, + --observeWhile_, + --observeBlocking, + --fnObservable, + --synchronousFnObservable, + + ---- * Helper types + --ObservableCallback, ) where -import Control.Applicative -import Control.Concurrent.MVar -import Control.Concurrent.STM -import Control.Monad.Catch -import Control.Monad.Except -import Control.Monad.Trans.Maybe -import Data.HashMap.Strict qualified as HM -import Data.IORef -import Data.Unique -import Quasar.Awaitable -import Quasar.Disposable -import Quasar.Prelude -import Quasar.ResourceManager - -data ObservableMessage a - = ObservableUpdate a - | ObservableLoading - | ObservableNotAvailable SomeException - deriving stock (Show, Generic) - -instance Functor ObservableMessage where - fmap fn (ObservableUpdate x) = ObservableUpdate (fn x) - fmap _ ObservableLoading = ObservableLoading - fmap _ (ObservableNotAvailable ex) = ObservableNotAvailable ex - -instance Applicative ObservableMessage where - pure = ObservableUpdate - liftA2 fn (ObservableUpdate x) (ObservableUpdate y) = ObservableUpdate (fn x y) - liftA2 _ (ObservableNotAvailable ex) _ = ObservableNotAvailable ex - liftA2 _ ObservableLoading _ = ObservableLoading - liftA2 _ _ (ObservableNotAvailable ex) = ObservableNotAvailable ex - liftA2 _ _ ObservableLoading = ObservableLoading - -instance Monad ObservableMessage where - (ObservableUpdate x) >>= fn = fn x - ObservableLoading >>= _ = ObservableLoading - (ObservableNotAvailable ex) >>= _ = ObservableNotAvailable ex - - -toObservableUpdate :: MonadThrow m => ObservableMessage a -> m (Maybe a) -toObservableUpdate (ObservableUpdate value) = pure $ Just value -toObservableUpdate ObservableLoading = pure Nothing -toObservableUpdate (ObservableNotAvailable ex) = throwM ex - - -class IsRetrievable v a | a -> v where - retrieve :: (MonadResourceManager m, MonadIO m, MonadMask m) => a -> m (Awaitable v) - -class IsRetrievable v o => IsObservable v o | o -> v where - -- | Register a callback to observe changes. The callback is called when the value changes, but depending on the - -- delivery method (e.g. network) intermediate values may be skipped. - -- - -- A correct implementation of observe must call the callback during registration (if no value is available - -- immediately an `ObservableLoading` will be delivered). - -- - -- The callback must return without blocking, otherwise other callbacks will be delayed. If the value can't be - -- processed immediately, use `observeBlocking` instead or manually pass the value to a thread that processes the - -- data, e.g. by using STM. - observe - :: (MonadResourceManager m, MonadIO m, MonadMask m) - => o -- ^ observable - -> (ObservableMessage v -> ResourceManagerIO ()) -- ^ callback - -> m () - observe observable = observe (toObservable observable) - - toObservable :: o -> Observable v - toObservable = Observable - - mapObservable :: (v -> a) -> o -> Observable a - mapObservable f = Observable . MappedObservable f - - {-# MINIMAL toObservable | observe #-} - - --- | Observe an observable by handling updates on the current thread. --- --- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered). --- --- The handler is allowed to block. When the value changes while the handler is running the handler will be run again --- after it completes; when the value changes multiple times it will only be executed once (with the latest value). -observeBlocking - :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) - => o - -> (ObservableMessage v -> m ()) - -> m a -observeBlocking observable handler = do - -- `withScopedResourceManager` removes the `observe` callback when the `handler` fails. - withScopedResourceManager do - var <- liftIO newEmptyTMVarIO - observe observable \msg -> liftIO $ atomically do - void $ tryTakeTMVar var - putTMVar var msg - - forever do - msg <- liftIO $ atomically $ takeTMVar var - handler msg - - --- | Internal control flow exception for `observeWhile` and `observeWhile_`. -data ObserveWhileCompleted = ObserveWhileCompleted - deriving stock (Eq, Show) - -instance Exception ObserveWhileCompleted - --- | Observe until the callback returns `Just`. -observeWhile - :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) - => o - -> (ObservableMessage v -> m (Maybe a)) - -> m a -observeWhile observable callback = do - resultVar <- liftIO $ newIORef unreachableCodePath - observeWhile_ observable \msg -> do - callback msg >>= \case - Just result -> do - liftIO $ writeIORef resultVar result - pure False - Nothing -> pure True - - liftIO $ readIORef resultVar - - --- | Observe until the callback returns `False`. -observeWhile_ - :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) - => o - -> (ObservableMessage v -> m Bool) - -> m () -observeWhile_ observable callback = - catch - do - observeBlocking observable \msg -> do - continue <- callback msg - unless continue $ throwM ObserveWhileCompleted - \ObserveWhileCompleted -> pure () - - -type ObservableCallback v = ObservableMessage v -> IO () - - --- | Existential quantification wrapper for the IsObservable type class. -data Observable v = forall o. IsObservable v o => Observable o -instance IsRetrievable v (Observable v) where - retrieve (Observable o) = retrieve o -instance IsObservable v (Observable v) where - observe (Observable o) = observe o - toObservable = id - mapObservable f (Observable o) = mapObservable f o - -instance Functor Observable where - fmap f = mapObservable f - -instance Applicative Observable where - pure = toObservable . ConstObservable - liftA2 fn x y = toObservable $ LiftA2Observable fn x y - -instance Monad Observable where - x >>= y = toObservable $ BindObservable x y - -instance MonadThrow Observable where - throwM :: forall e v. Exception e => e -> Observable v - throwM = toObservable . FailedObservable @v . toException - -instance MonadCatch Observable where - catch action handler = toObservable $ CatchObservable action handler - -instance MonadFail Observable where - fail = throwM . userError - -instance Alternative Observable where - empty = fail "empty" - x <|> y = x `catchAll` const y - -instance MonadPlus Observable - - - -data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o -instance IsRetrievable v (MappedObservable v) where - retrieve (MappedObservable f observable) = f <<$>> retrieve observable -instance IsObservable v (MappedObservable v) where - observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn) - mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream - - - -data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r) - -instance IsRetrievable r (BindObservable r) where - retrieve (BindObservable fx fn) = do - awaitable <- retrieve fx - value <- liftIO $ await awaitable - retrieve $ fn value - -instance IsObservable r (BindObservable r) where - observe (BindObservable fx fn) callback = do - disposableVar <- liftIO $ newTMVarIO noDisposable - keyVar <- liftIO $ newTMVarIO =<< newUnique - - observe fx (leftCallback disposableVar keyVar) - where - leftCallback disposableVar keyVar message = do - key <- liftIO newUnique - - oldDisposable <- liftIO $ atomically do - -- Blocks while `rightCallback` is running - void $ swapTMVar keyVar key - - takeTMVar disposableVar - - disposeEventually_ oldDisposable - - disposable <- case message of - (ObservableUpdate x) -> captureDisposable_ $ observe (fn x) (rightCallback keyVar key) - ObservableLoading -> noDisposable <$ callback ObservableLoading - (ObservableNotAvailable ex) -> noDisposable <$ callback (ObservableNotAvailable ex) - - liftIO $ atomically $ putTMVar disposableVar disposable - - rightCallback :: TMVar Unique -> Unique -> ObservableMessage r -> ResourceManagerIO () - rightCallback keyVar key message = - bracket - -- Take key var to prevent parallel callbacks - (liftIO $ atomically $ takeTMVar keyVar) - -- Put key back - (liftIO . atomically . putTMVar keyVar) - -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed - (\currentKey -> when (key == currentKey) $ callback message) - - -data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r) - -instance IsRetrievable r (CatchObservable e r) where - retrieve (CatchObservable fx fn) = retrieve fx `catch` \ex -> retrieve (fn ex) - -instance IsObservable r (CatchObservable e r) where - observe (CatchObservable fx fn) callback = do - disposableVar <- liftIO $ newTMVarIO noDisposable - keyVar <- liftIO $ newTMVarIO =<< newUnique - - observe fx (leftCallback disposableVar keyVar) - where - leftCallback disposableVar keyVar message = do - key <- liftIO newUnique - - oldDisposable <- liftIO $ atomically do - -- Blocks while `rightCallback` is running - void $ swapTMVar keyVar key - - takeTMVar disposableVar - - disposeEventually_ oldDisposable - - disposable <- case message of - (ObservableNotAvailable (fromException -> Just ex)) -> - captureDisposable_ $ observe (fn ex) (rightCallback keyVar key) - msg -> noDisposable <$ callback msg - - liftIO $ atomically $ putTMVar disposableVar disposable - - rightCallback :: TMVar Unique -> Unique -> ObservableMessage r -> ResourceManagerIO () - rightCallback keyVar key message = - bracket - -- Take key var to prevent parallel callbacks - (liftIO $ atomically $ takeTMVar keyVar) - -- Put key back - (liftIO . atomically . putTMVar keyVar) - -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed - (\currentKey -> when (key == currentKey) $ callback message) - - - -newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v))) -instance IsRetrievable v (ObservableVar v) where - retrieve (ObservableVar mvar) = liftIO $ pure . fst <$> readMVar mvar -instance IsObservable v (ObservableVar v) where - observe (ObservableVar mvar) callback = do - resourceManager <- askResourceManager - - registerNewResource_ $ liftIO do - let wrappedCallback = enterResourceManager resourceManager . callback - - key <- liftIO newUnique - - modifyMVar_ mvar $ \(state, subscribers) -> do - -- Call listener with initial value - wrappedCallback (pure state) - pure (state, HM.insert key wrappedCallback subscribers) - - atomically $ newDisposable $ disposeFn key - where - disposeFn :: Unique -> IO () - disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers)) - -newObservableVar :: MonadIO m => v -> m (ObservableVar v) -newObservableVar initialValue = liftIO do - ObservableVar <$> newMVar (initialValue, HM.empty) - -setObservableVar :: MonadIO m => ObservableVar v -> v -> m () -setObservableVar observable value = modifyObservableVar observable (const value) - -stateObservableVar :: MonadIO m => ObservableVar v -> (v -> (a, v)) -> m a -stateObservableVar (ObservableVar mvar) f = - liftIO $ modifyMVar mvar $ \(oldState, subscribers) -> do - let (result, newState) = f oldState - mapM_ (\callback -> callback (pure newState)) subscribers - pure ((newState, subscribers), result) - -modifyObservableVar :: MonadIO m => ObservableVar v -> (v -> v) -> m () -modifyObservableVar observable f = stateObservableVar observable (((), ) . f) - - - --- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting --- observable updates according to the merge function. +--import Control.Applicative +--import Control.Concurrent.MVar +--import Control.Concurrent.STM +--import Control.Monad.Catch +--import Control.Monad.Except +--import Control.Monad.Trans.Maybe +--import Data.HashMap.Strict qualified as HM +--import Data.IORef +--import Data.Unique +--import Quasar.Awaitable +--import Quasar.Disposable +--import Quasar.Prelude +--import Quasar.ResourceManager -- --- There is no caching involed, every subscriber effectively subscribes to both input observables. -data LiftA2Observable r = forall r0 r1. LiftA2Observable (r0 -> r1 -> r) (Observable r0) (Observable r1) - -instance IsRetrievable r (LiftA2Observable r) where - retrieve (LiftA2Observable fn fx fy) = - liftA2 (liftA2 fn) (retrieve fx) (retrieve fy) - -instance IsObservable r (LiftA2Observable r) where - observe (LiftA2Observable fn fx fy) callback = do - var0 <- liftIO $ newTVarIO Nothing - var1 <- liftIO $ newTVarIO Nothing - observe fx (mergeCallback var0 var1 . writeTVar var0 . Just) - observe fy (mergeCallback var0 var1 . writeTVar var1 . Just) - where - mergeCallback var0 var1 update = do - mMerged <- liftIO $ atomically do - update - runMaybeT $ liftA2 (liftA2 fn) (MaybeT (readTVar var0)) (MaybeT (readTVar var1)) - - -- Run the callback only once both values have been received - mapM_ callback mMerged - - -data FnObservable v = FnObservable { - retrieveFn :: ResourceManagerIO (Awaitable v), - observeFn :: (ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO () -} -instance IsRetrievable v (FnObservable v) where - retrieve FnObservable{retrieveFn} = liftResourceManagerIO retrieveFn -instance IsObservable v (FnObservable v) where - observe FnObservable{observeFn} callback = liftResourceManagerIO $ observeFn callback - mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable { - retrieveFn = f <<$>> retrieveFn, - observeFn = \listener -> observeFn (listener . fmap f) - } - --- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`. -fnObservable - :: ((ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ()) - -> ResourceManagerIO (Awaitable v) - -> Observable v -fnObservable observeFn retrieveFn = toObservable FnObservable{observeFn, retrieveFn} - --- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`. -synchronousFnObservable - :: forall v. - ((ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ()) - -> IO v - -> Observable v -synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn retrieveFn - where - retrieveFn :: ResourceManagerIO (Awaitable v) - retrieveFn = liftIO $ pure <$> synchronousRetrieveFn - - -newtype ConstObservable v = ConstObservable v -instance IsRetrievable v (ConstObservable v) where - retrieve (ConstObservable x) = pure $ pure x -instance IsObservable v (ConstObservable v) where - observe (ConstObservable x) callback = do - liftResourceManagerIO $ callback $ ObservableUpdate x - - -newtype FailedObservable v = FailedObservable SomeException -instance IsRetrievable v (FailedObservable v) where - retrieve (FailedObservable ex) = liftIO $ throwIO ex -instance IsObservable v (FailedObservable v) where - observe (FailedObservable ex) callback = do - liftResourceManagerIO $ callback $ ObservableNotAvailable ex - - --- TODO implement ---cacheObservable :: IsObservable v o => o -> Observable v ---cacheObservable = undefined +--data ObservableMessage a +-- = ObservableUpdate a +-- | ObservableLoading +-- | ObservableNotAvailable SomeException +-- deriving stock (Show, Generic) +-- +--instance Functor ObservableMessage where +-- fmap fn (ObservableUpdate x) = ObservableUpdate (fn x) +-- fmap _ ObservableLoading = ObservableLoading +-- fmap _ (ObservableNotAvailable ex) = ObservableNotAvailable ex +-- +--instance Applicative ObservableMessage where +-- pure = ObservableUpdate +-- liftA2 fn (ObservableUpdate x) (ObservableUpdate y) = ObservableUpdate (fn x y) +-- liftA2 _ (ObservableNotAvailable ex) _ = ObservableNotAvailable ex +-- liftA2 _ ObservableLoading _ = ObservableLoading +-- liftA2 _ _ (ObservableNotAvailable ex) = ObservableNotAvailable ex +-- liftA2 _ _ ObservableLoading = ObservableLoading +-- +--instance Monad ObservableMessage where +-- (ObservableUpdate x) >>= fn = fn x +-- ObservableLoading >>= _ = ObservableLoading +-- (ObservableNotAvailable ex) >>= _ = ObservableNotAvailable ex +-- +-- +--toObservableUpdate :: MonadThrow m => ObservableMessage a -> m (Maybe a) +--toObservableUpdate (ObservableUpdate value) = pure $ Just value +--toObservableUpdate ObservableLoading = pure Nothing +--toObservableUpdate (ObservableNotAvailable ex) = throwM ex +-- +-- +--class IsRetrievable v a | a -> v where +-- retrieve :: (MonadResourceManager m, MonadIO m, MonadMask m) => a -> m (Awaitable v) +-- +--class IsRetrievable v o => IsObservable v o | o -> v where +-- -- | Register a callback to observe changes. The callback is called when the value changes, but depending on the +-- -- delivery method (e.g. network) intermediate values may be skipped. +-- -- +-- -- A correct implementation of observe must call the callback during registration (if no value is available +-- -- immediately an `ObservableLoading` will be delivered). +-- -- +-- -- The callback must return without blocking, otherwise other callbacks will be delayed. If the value can't be +-- -- processed immediately, use `observeBlocking` instead or manually pass the value to a thread that processes the +-- -- data, e.g. by using STM. +-- observe +-- :: (MonadResourceManager m, MonadIO m, MonadMask m) +-- => o -- ^ observable +-- -> (ObservableMessage v -> ResourceManagerIO ()) -- ^ callback +-- -> m () +-- observe observable = observe (toObservable observable) +-- +-- toObservable :: o -> Observable v +-- toObservable = Observable +-- +-- mapObservable :: (v -> a) -> o -> Observable a +-- mapObservable f = Observable . MappedObservable f +-- +-- {-# MINIMAL toObservable | observe #-} +-- +-- +---- | Observe an observable by handling updates on the current thread. +---- +---- `observeBlocking` will run the handler whenever the observable changes (forever / until an exception is encountered). +---- +---- The handler is allowed to block. When the value changes while the handler is running the handler will be run again +---- after it completes; when the value changes multiple times it will only be executed once (with the latest value). +--observeBlocking +-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) +-- => o +-- -> (ObservableMessage v -> m ()) +-- -> m a +--observeBlocking observable handler = do +-- -- `withScopedResourceManager` removes the `observe` callback when the `handler` fails. +-- withScopedResourceManager do +-- var <- liftIO newEmptyTMVarIO +-- observe observable \msg -> liftIO $ atomically do +-- void $ tryTakeTMVar var +-- putTMVar var msg +-- +-- forever do +-- msg <- liftIO $ atomically $ takeTMVar var +-- handler msg +-- +-- +---- | Internal control flow exception for `observeWhile` and `observeWhile_`. +--data ObserveWhileCompleted = ObserveWhileCompleted +-- deriving stock (Eq, Show) +-- +--instance Exception ObserveWhileCompleted +-- +---- | Observe until the callback returns `Just`. +--observeWhile +-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) +-- => o +-- -> (ObservableMessage v -> m (Maybe a)) +-- -> m a +--observeWhile observable callback = do +-- resultVar <- liftIO $ newIORef unreachableCodePath +-- observeWhile_ observable \msg -> do +-- callback msg >>= \case +-- Just result -> do +-- liftIO $ writeIORef resultVar result +-- pure False +-- Nothing -> pure True +-- +-- liftIO $ readIORef resultVar +-- +-- +---- | Observe until the callback returns `False`. +--observeWhile_ +-- :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) +-- => o +-- -> (ObservableMessage v -> m Bool) +-- -> m () +--observeWhile_ observable callback = +-- catch +-- do +-- observeBlocking observable \msg -> do +-- continue <- callback msg +-- unless continue $ throwM ObserveWhileCompleted +-- \ObserveWhileCompleted -> pure () +-- +-- +--type ObservableCallback v = ObservableMessage v -> IO () +-- +-- +---- | Existential quantification wrapper for the IsObservable type class. +--data Observable v = forall o. IsObservable v o => Observable o +--instance IsRetrievable v (Observable v) where +-- retrieve (Observable o) = retrieve o +--instance IsObservable v (Observable v) where +-- observe (Observable o) = observe o +-- toObservable = id +-- mapObservable f (Observable o) = mapObservable f o +-- +--instance Functor Observable where +-- fmap f = mapObservable f +-- +--instance Applicative Observable where +-- pure = toObservable . ConstObservable +-- liftA2 fn x y = toObservable $ LiftA2Observable fn x y +-- +--instance Monad Observable where +-- x >>= y = toObservable $ BindObservable x y +-- +--instance MonadThrow Observable where +-- throwM :: forall e v. Exception e => e -> Observable v +-- throwM = toObservable . FailedObservable @v . toException +-- +--instance MonadCatch Observable where +-- catch action handler = toObservable $ CatchObservable action handler +-- +--instance MonadFail Observable where +-- fail = throwM . userError +-- +--instance Alternative Observable where +-- empty = fail "empty" +-- x <|> y = x `catchAll` const y +-- +--instance MonadPlus Observable +-- +-- +-- +--data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o +--instance IsRetrievable v (MappedObservable v) where +-- retrieve (MappedObservable f observable) = f <<$>> retrieve observable +--instance IsObservable v (MappedObservable v) where +-- observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn) +-- mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream +-- +-- +-- +--data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r) +-- +--instance IsRetrievable r (BindObservable r) where +-- retrieve (BindObservable fx fn) = do +-- awaitable <- retrieve fx +-- value <- liftIO $ await awaitable +-- retrieve $ fn value +-- +--instance IsObservable r (BindObservable r) where +-- observe (BindObservable fx fn) callback = do +-- disposableVar <- liftIO $ newTMVarIO noDisposable +-- keyVar <- liftIO $ newTMVarIO =<< newUnique +-- +-- observe fx (leftCallback disposableVar keyVar) +-- where +-- leftCallback disposableVar keyVar message = do +-- key <- liftIO newUnique +-- +-- oldDisposable <- liftIO $ atomically do +-- -- Blocks while `rightCallback` is running +-- void $ swapTMVar keyVar key +-- +-- takeTMVar disposableVar +-- +-- disposeEventually_ oldDisposable +-- +-- disposable <- case message of +-- (ObservableUpdate x) -> captureDisposable_ $ observe (fn x) (rightCallback keyVar key) +-- ObservableLoading -> noDisposable <$ callback ObservableLoading +-- (ObservableNotAvailable ex) -> noDisposable <$ callback (ObservableNotAvailable ex) +-- +-- liftIO $ atomically $ putTMVar disposableVar disposable +-- +-- rightCallback :: TMVar Unique -> Unique -> ObservableMessage r -> ResourceManagerIO () +-- rightCallback keyVar key message = +-- bracket +-- -- Take key var to prevent parallel callbacks +-- (liftIO $ atomically $ takeTMVar keyVar) +-- -- Put key back +-- (liftIO . atomically . putTMVar keyVar) +-- -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed +-- (\currentKey -> when (key == currentKey) $ callback message) +-- +-- +--data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r) +-- +--instance IsRetrievable r (CatchObservable e r) where +-- retrieve (CatchObservable fx fn) = retrieve fx `catch` \ex -> retrieve (fn ex) +-- +--instance IsObservable r (CatchObservable e r) where +-- observe (CatchObservable fx fn) callback = do +-- disposableVar <- liftIO $ newTMVarIO noDisposable +-- keyVar <- liftIO $ newTMVarIO =<< newUnique +-- +-- observe fx (leftCallback disposableVar keyVar) +-- where +-- leftCallback disposableVar keyVar message = do +-- key <- liftIO newUnique +-- +-- oldDisposable <- liftIO $ atomically do +-- -- Blocks while `rightCallback` is running +-- void $ swapTMVar keyVar key +-- +-- takeTMVar disposableVar +-- +-- disposeEventually_ oldDisposable +-- +-- disposable <- case message of +-- (ObservableNotAvailable (fromException -> Just ex)) -> +-- captureDisposable_ $ observe (fn ex) (rightCallback keyVar key) +-- msg -> noDisposable <$ callback msg +-- +-- liftIO $ atomically $ putTMVar disposableVar disposable +-- +-- rightCallback :: TMVar Unique -> Unique -> ObservableMessage r -> ResourceManagerIO () +-- rightCallback keyVar key message = +-- bracket +-- -- Take key var to prevent parallel callbacks +-- (liftIO $ atomically $ takeTMVar keyVar) +-- -- Put key back +-- (liftIO . atomically . putTMVar keyVar) +-- -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed +-- (\currentKey -> when (key == currentKey) $ callback message) +-- +-- +-- +--newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v))) +--instance IsRetrievable v (ObservableVar v) where +-- retrieve (ObservableVar mvar) = liftIO $ pure . fst <$> readMVar mvar +--instance IsObservable v (ObservableVar v) where +-- observe (ObservableVar mvar) callback = do +-- resourceManager <- askResourceManager +-- +-- registerNewResource_ $ liftIO do +-- let wrappedCallback = enterResourceManager resourceManager . callback +-- +-- key <- liftIO newUnique +-- +-- modifyMVar_ mvar $ \(state, subscribers) -> do +-- -- Call listener with initial value +-- wrappedCallback (pure state) +-- pure (state, HM.insert key wrappedCallback subscribers) +-- +-- atomically $ newDisposable $ disposeFn key +-- where +-- disposeFn :: Unique -> IO () +-- disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers)) +-- +--newObservableVar :: MonadIO m => v -> m (ObservableVar v) +--newObservableVar initialValue = liftIO do +-- ObservableVar <$> newMVar (initialValue, HM.empty) +-- +--setObservableVar :: MonadIO m => ObservableVar v -> v -> m () +--setObservableVar observable value = modifyObservableVar observable (const value) +-- +--stateObservableVar :: MonadIO m => ObservableVar v -> (v -> (a, v)) -> m a +--stateObservableVar (ObservableVar mvar) f = +-- liftIO $ modifyMVar mvar $ \(oldState, subscribers) -> do +-- let (result, newState) = f oldState +-- mapM_ (\callback -> callback (pure newState)) subscribers +-- pure ((newState, subscribers), result) +-- +--modifyObservableVar :: MonadIO m => ObservableVar v -> (v -> v) -> m () +--modifyObservableVar observable f = stateObservableVar observable (((), ) . f) +-- +-- +-- +---- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting +---- observable updates according to the merge function. +---- +---- There is no caching involed, every subscriber effectively subscribes to both input observables. +--data LiftA2Observable r = forall r0 r1. LiftA2Observable (r0 -> r1 -> r) (Observable r0) (Observable r1) +-- +--instance IsRetrievable r (LiftA2Observable r) where +-- retrieve (LiftA2Observable fn fx fy) = +-- liftA2 (liftA2 fn) (retrieve fx) (retrieve fy) +-- +--instance IsObservable r (LiftA2Observable r) where +-- observe (LiftA2Observable fn fx fy) callback = do +-- var0 <- liftIO $ newTVarIO Nothing +-- var1 <- liftIO $ newTVarIO Nothing +-- observe fx (mergeCallback var0 var1 . writeTVar var0 . Just) +-- observe fy (mergeCallback var0 var1 . writeTVar var1 . Just) +-- where +-- mergeCallback var0 var1 update = do +-- mMerged <- liftIO $ atomically do +-- update +-- runMaybeT $ liftA2 (liftA2 fn) (MaybeT (readTVar var0)) (MaybeT (readTVar var1)) +-- +-- -- Run the callback only once both values have been received +-- mapM_ callback mMerged +-- +-- +--data FnObservable v = FnObservable { +-- retrieveFn :: ResourceManagerIO (Awaitable v), +-- observeFn :: (ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO () +--} +--instance IsRetrievable v (FnObservable v) where +-- retrieve FnObservable{retrieveFn} = liftResourceManagerIO retrieveFn +--instance IsObservable v (FnObservable v) where +-- observe FnObservable{observeFn} callback = liftResourceManagerIO $ observeFn callback +-- mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable { +-- retrieveFn = f <<$>> retrieveFn, +-- observeFn = \listener -> observeFn (listener . fmap f) +-- } +-- +---- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`. +--fnObservable +-- :: ((ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ()) +-- -> ResourceManagerIO (Awaitable v) +-- -> Observable v +--fnObservable observeFn retrieveFn = toObservable FnObservable{observeFn, retrieveFn} +-- +---- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`. +--synchronousFnObservable +-- :: forall v. +-- ((ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ()) +-- -> IO v +-- -> Observable v +--synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn retrieveFn +-- where +-- retrieveFn :: ResourceManagerIO (Awaitable v) +-- retrieveFn = liftIO $ pure <$> synchronousRetrieveFn +-- +-- +--newtype ConstObservable v = ConstObservable v +--instance IsRetrievable v (ConstObservable v) where +-- retrieve (ConstObservable x) = pure $ pure x +--instance IsObservable v (ConstObservable v) where +-- observe (ConstObservable x) callback = do +-- liftResourceManagerIO $ callback $ ObservableUpdate x +-- +-- +--newtype FailedObservable v = FailedObservable SomeException +--instance IsRetrievable v (FailedObservable v) where +-- retrieve (FailedObservable ex) = liftIO $ throwIO ex +--instance IsObservable v (FailedObservable v) where +-- observe (FailedObservable ex) callback = do +-- liftResourceManagerIO $ callback $ ObservableNotAvailable ex +-- +-- +---- TODO implement +----cacheObservable :: IsObservable v o => o -> Observable v +----cacheObservable = undefined diff --git a/src/Quasar/Observable/Delta.hs b/src/Quasar/Observable/Delta.hs index be92b74..dbf5450 100644 --- a/src/Quasar/Observable/Delta.hs +++ b/src/Quasar/Observable/Delta.hs @@ -1,66 +1,66 @@ module Quasar.Observable.Delta ( - IsDeltaObservable(..), - Delta(..), - DeltaObservable, + --IsDeltaObservable(..), + --Delta(..), + --DeltaObservable, ) where -import Data.Binary (Binary) -import Data.Binary qualified as B -import Data.HashMap.Strict qualified as HM -import Quasar.Disposable -import Quasar.Observable -import Quasar.Prelude - -data Delta k v = Reset (HM.HashMap k v) | Insert k v | Delete k - deriving stock (Eq, Show, Generic) -instance Functor (Delta k) where - fmap f (Reset state) = Reset (f <$> state) - fmap f (Insert key value) = Insert key (f value) - fmap _ (Delete key) = Delete key -instance (Eq k, Hashable k, Binary k, Binary v) => Binary (Delta k v) where - get = do - (tag :: Word8) <- B.get - case tag of - 0 -> Reset . HM.fromList <$> B.get - 1 -> Insert <$> B.get <*> B.get - 2 -> Delete <$> B.get - _ -> fail "Invalid tag" - put (Reset hashmap) = B.put (0 :: Word8) >> B.put (HM.toList hashmap) - put (Insert key value) = B.put (1 :: Word8) >> B.put key >> B.put value - put (Delete key) = B.put (2 :: Word8) >> B.put key - -class IsObservable (HM.HashMap k v) o => IsDeltaObservable k v o | o -> k, o -> v where - -- TODO change signature to use resource manager - subscribeDelta :: MonadIO m => o -> (Delta k v -> IO ()) -> m Disposable - ---observeHashMapDefaultImpl :: forall k v o. (Eq k, Hashable k) => IsDeltaObservable k v o => o -> (HM.HashMap k v -> IO ()) -> IO Disposable ---observeHashMapDefaultImpl o callback = do --- hashMapRef <- newIORef HM.empty --- subscribeDelta o (deltaCallback hashMapRef) --- where --- deltaCallback :: IORef (HM.HashMap k v) -> Delta k v -> IO () --- deltaCallback hashMapRef delta = callback =<< atomicModifyIORef' hashMapRef ((\x -> (x, x)) . applyDelta delta) --- applyDelta :: Delta k v -> HM.HashMap k v -> HM.HashMap k v --- applyDelta (Reset state) = const state --- applyDelta (Insert key value) = HM.insert key value --- applyDelta (Delete key) = HM.delete key - - -data DeltaObservable k v = forall o. IsDeltaObservable k v o => DeltaObservable o -instance IsRetrievable (HM.HashMap k v) (DeltaObservable k v) where - retrieve (DeltaObservable o) = retrieve o -instance IsObservable (HM.HashMap k v) (DeltaObservable k v) where - observe (DeltaObservable o) = observe o -instance IsDeltaObservable k v (DeltaObservable k v) where - subscribeDelta (DeltaObservable o) = subscribeDelta o -instance Functor (DeltaObservable k) where - fmap f (DeltaObservable o) = DeltaObservable $ MappedDeltaObservable f o - - -data MappedDeltaObservable k b = forall a o. IsDeltaObservable k a o => MappedDeltaObservable (a -> b) o -instance IsRetrievable (HM.HashMap k b) (MappedDeltaObservable k b) where - retrieve (MappedDeltaObservable f o) = fmap f <<$>> retrieve o -instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where - observe (MappedDeltaObservable f o) callback = observe o (callback . fmap (fmap f)) -instance IsDeltaObservable k b (MappedDeltaObservable k b) where - subscribeDelta (MappedDeltaObservable f o) callback = subscribeDelta o (callback . fmap f) +--import Data.Binary (Binary) +--import Data.Binary qualified as B +--import Data.HashMap.Strict qualified as HM +--import Quasar.Disposable +--import Quasar.Observable +--import Quasar.Prelude +-- +--data Delta k v = Reset (HM.HashMap k v) | Insert k v | Delete k +-- deriving stock (Eq, Show, Generic) +--instance Functor (Delta k) where +-- fmap f (Reset state) = Reset (f <$> state) +-- fmap f (Insert key value) = Insert key (f value) +-- fmap _ (Delete key) = Delete key +--instance (Eq k, Hashable k, Binary k, Binary v) => Binary (Delta k v) where +-- get = do +-- (tag :: Word8) <- B.get +-- case tag of +-- 0 -> Reset . HM.fromList <$> B.get +-- 1 -> Insert <$> B.get <*> B.get +-- 2 -> Delete <$> B.get +-- _ -> fail "Invalid tag" +-- put (Reset hashmap) = B.put (0 :: Word8) >> B.put (HM.toList hashmap) +-- put (Insert key value) = B.put (1 :: Word8) >> B.put key >> B.put value +-- put (Delete key) = B.put (2 :: Word8) >> B.put key +-- +--class IsObservable (HM.HashMap k v) o => IsDeltaObservable k v o | o -> k, o -> v where +-- -- TODO change signature to use resource manager +-- subscribeDelta :: MonadIO m => o -> (Delta k v -> IO ()) -> m Disposable +-- +----observeHashMapDefaultImpl :: forall k v o. (Eq k, Hashable k) => IsDeltaObservable k v o => o -> (HM.HashMap k v -> IO ()) -> IO Disposable +----observeHashMapDefaultImpl o callback = do +---- hashMapRef <- newIORef HM.empty +---- subscribeDelta o (deltaCallback hashMapRef) +---- where +---- deltaCallback :: IORef (HM.HashMap k v) -> Delta k v -> IO () +---- deltaCallback hashMapRef delta = callback =<< atomicModifyIORef' hashMapRef ((\x -> (x, x)) . applyDelta delta) +---- applyDelta :: Delta k v -> HM.HashMap k v -> HM.HashMap k v +---- applyDelta (Reset state) = const state +---- applyDelta (Insert key value) = HM.insert key value +---- applyDelta (Delete key) = HM.delete key +-- +-- +--data DeltaObservable k v = forall o. IsDeltaObservable k v o => DeltaObservable o +--instance IsRetrievable (HM.HashMap k v) (DeltaObservable k v) where +-- retrieve (DeltaObservable o) = retrieve o +--instance IsObservable (HM.HashMap k v) (DeltaObservable k v) where +-- observe (DeltaObservable o) = observe o +--instance IsDeltaObservable k v (DeltaObservable k v) where +-- subscribeDelta (DeltaObservable o) = subscribeDelta o +--instance Functor (DeltaObservable k) where +-- fmap f (DeltaObservable o) = DeltaObservable $ MappedDeltaObservable f o +-- +-- +--data MappedDeltaObservable k b = forall a o. IsDeltaObservable k a o => MappedDeltaObservable (a -> b) o +--instance IsRetrievable (HM.HashMap k b) (MappedDeltaObservable k b) where +-- retrieve (MappedDeltaObservable f o) = fmap f <<$>> retrieve o +--instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where +-- observe (MappedDeltaObservable f o) callback = observe o (callback . fmap (fmap f)) +--instance IsDeltaObservable k b (MappedDeltaObservable k b) where +-- subscribeDelta (MappedDeltaObservable f o) callback = subscribeDelta o (callback . fmap f) diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index 8923275..3a0927d 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -1,165 +1,165 @@ {-# LANGUAGE ViewPatterns #-} module Quasar.Observable.ObservableHashMap ( - ObservableHashMap, - new, - observeKey, - insert, - delete, - lookup, - lookupDelete, + --ObservableHashMap, + --new, + --observeKey, + --insert, + --delete, + --lookup, + --lookupDelete, ) where -import Control.Concurrent.STM (atomically) -import Data.HashMap.Strict qualified as HM -import Quasar.Disposable -import Quasar.Observable -import Quasar.Observable.Delta -import Quasar.Prelude hiding (lookup, lookupDelete) -import Quasar.Utils.ExtraT - - -newtype ObservableHashMap k v = ObservableHashMap (MVar (Handle k v)) -data Handle k v = Handle { - keyHandles :: HM.HashMap k (KeyHandle v), - subscribers :: HM.HashMap Unique (ObservableMessage (HM.HashMap k v) -> IO ()), - deltaSubscribers :: HM.HashMap Unique (Delta k v -> IO ()) -} - -data KeyHandle v = KeyHandle { - value :: Maybe v, - keySubscribers :: HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) -} - -instance IsRetrievable (HM.HashMap k v) (ObservableHashMap k v) where - retrieve (ObservableHashMap mvar) = liftIO $ pure . HM.mapMaybe value . keyHandles <$> readMVar mvar -instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where - observe = undefined --- oldObserve ohm callback = liftIO $ modifyHandle update ohm +--import Control.Concurrent.STM (atomically) +--import Data.HashMap.Strict qualified as HM +--import Quasar.Disposable +--import Quasar.Observable +--import Quasar.Observable.Delta +--import Quasar.Prelude hiding (lookup, lookupDelete) +--import Quasar.Utils.ExtraT +-- +-- +--newtype ObservableHashMap k v = ObservableHashMap (MVar (Handle k v)) +--data Handle k v = Handle { +-- keyHandles :: HM.HashMap k (KeyHandle v), +-- subscribers :: HM.HashMap Unique (ObservableMessage (HM.HashMap k v) -> IO ()), +-- deltaSubscribers :: HM.HashMap Unique (Delta k v -> IO ()) +--} +-- +--data KeyHandle v = KeyHandle { +-- value :: Maybe v, +-- keySubscribers :: HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) +--} +-- +--instance IsRetrievable (HM.HashMap k v) (ObservableHashMap k v) where +-- retrieve (ObservableHashMap mvar) = liftIO $ pure . HM.mapMaybe value . keyHandles <$> readMVar mvar +--instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where +-- observe = undefined +---- oldObserve ohm callback = liftIO $ modifyHandle update ohm +---- where +---- update :: Handle k v -> IO (Handle k v, Disposable) +---- update handle = do +---- callback $ pure $ toHashMap handle +---- key <- newUnique +---- let handle' = handle {subscribers = HM.insert key callback (subscribers handle)} +---- (handle',) <$> newDisposable (unsubscribe key) +---- unsubscribe :: Unique -> IO () +---- unsubscribe key = modifyHandle_ (\handle -> pure handle {subscribers = HM.delete key (subscribers handle)}) ohm +-- +--instance IsDeltaObservable k v (ObservableHashMap k v) where +-- subscribeDelta ohm callback = liftIO $ modifyHandle update ohm -- where -- update :: Handle k v -> IO (Handle k v, Disposable) -- update handle = do --- callback $ pure $ toHashMap handle +-- callback (Reset $ toHashMap handle) -- key <- newUnique --- let handle' = handle {subscribers = HM.insert key callback (subscribers handle)} --- (handle',) <$> newDisposable (unsubscribe key) +-- let handle' = handle {deltaSubscribers = HM.insert key callback (deltaSubscribers handle)} +-- (handle',) <$> atomically (newDisposable (unsubscribe key)) -- unsubscribe :: Unique -> IO () --- unsubscribe key = modifyHandle_ (\handle -> pure handle {subscribers = HM.delete key (subscribers handle)}) ohm - -instance IsDeltaObservable k v (ObservableHashMap k v) where - subscribeDelta ohm callback = liftIO $ modifyHandle update ohm - where - update :: Handle k v -> IO (Handle k v, Disposable) - update handle = do - callback (Reset $ toHashMap handle) - key <- newUnique - let handle' = handle {deltaSubscribers = HM.insert key callback (deltaSubscribers handle)} - (handle',) <$> atomically (newDisposable (unsubscribe key)) - unsubscribe :: Unique -> IO () - unsubscribe key = modifyHandle_ (\handle -> pure handle {deltaSubscribers = HM.delete key (deltaSubscribers handle)}) ohm - - -toHashMap :: Handle k v -> HM.HashMap k v -toHashMap = HM.mapMaybe value . keyHandles - -modifyHandle :: (Handle k v -> IO (Handle k v, a)) -> ObservableHashMap k v -> IO a -modifyHandle f (ObservableHashMap mvar) = modifyMVar mvar f - -modifyHandle_ :: (Handle k v -> IO (Handle k v)) -> ObservableHashMap k v -> IO () -modifyHandle_ f = modifyHandle (fmap (,()) . f) - -modifyKeyHandle :: (Eq k, Hashable k) => (KeyHandle v -> IO (a, KeyHandle v)) -> k -> ObservableHashMap k v -> IO a -modifyKeyHandle f k = modifyHandle (updateKeyHandle f k) - -modifyKeyHandle_ :: forall k v. (Eq k, Hashable k) => (KeyHandle v -> IO (KeyHandle v)) -> k -> ObservableHashMap k v -> IO () -modifyKeyHandle_ f = modifyKeyHandle (fmap ((), ) . f) - -updateKeyHandle :: forall k v a. (Eq k, Hashable k) => (KeyHandle v -> IO (a, KeyHandle v)) -> k -> Handle k v -> IO (Handle k v, a) -updateKeyHandle f k handle = do - (result, keyHandles') <- runExtraT $ HM.alterF updateMaybe k (keyHandles handle) - pure (handle {keyHandles = keyHandles'}, result) - where - updateMaybe :: Maybe (KeyHandle v) -> ExtraT a IO (Maybe (KeyHandle v)) - updateMaybe = fmap toMaybe . (ExtraT . f) . fromMaybe emptyKeyHandle - --updateMaybe = undefined - emptyKeyHandle :: KeyHandle v - emptyKeyHandle = KeyHandle Nothing HM.empty - toMaybe :: KeyHandle v -> Maybe (KeyHandle v) - toMaybe (KeyHandle Nothing (HM.null -> True)) = Nothing - toMaybe keyHandle = Just keyHandle - -modifyKeyHandleNotifying :: (Eq k, Hashable k) => (KeyHandle v -> IO ((a, Maybe (Delta k v)), KeyHandle v)) -> k -> ObservableHashMap k v -> IO a -modifyKeyHandleNotifying f k = modifyHandle $ \handle -> do - (newHandle, (result, delta)) <- updateKeyHandle f k handle - notifySubscribers newHandle delta - pure (newHandle, result) - -modifyKeyHandleNotifying_ :: (Eq k, Hashable k) => (KeyHandle v -> IO (Maybe (Delta k v), KeyHandle v)) -> k -> ObservableHashMap k v -> IO () -modifyKeyHandleNotifying_ f k = modifyHandle_ $ \handle -> do - (newHandle, delta) <- updateKeyHandle f k handle - notifySubscribers newHandle delta - pure newHandle - -notifySubscribers :: Handle k v -> Maybe (Delta k v) -> IO () -notifySubscribers _ Nothing = pure () -notifySubscribers handle@Handle{deltaSubscribers, subscribers} (Just delta) = do - mapM_ ($ delta) $ HM.elems deltaSubscribers - mapM_ ($ pure (toHashMap handle)) $ HM.elems subscribers - -modifyKeySubscribers :: (HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) -> HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ())) -> KeyHandle v -> KeyHandle v -modifyKeySubscribers fn keyHandle = keyHandle {keySubscribers = fn (keySubscribers keyHandle)} - -new :: MonadIO m => m (ObservableHashMap k v) -new = liftIO $ ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty} - -observeKey :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> Observable (Maybe v) -observeKey = undefined ---observeKey key ohm@(ObservableHashMap mvar) = synchronousFnObservable observeFn retrieveFn +-- unsubscribe key = modifyHandle_ (\handle -> pure handle {deltaSubscribers = HM.delete key (deltaSubscribers handle)}) ohm +-- +-- +--toHashMap :: Handle k v -> HM.HashMap k v +--toHashMap = HM.mapMaybe value . keyHandles +-- +--modifyHandle :: (Handle k v -> IO (Handle k v, a)) -> ObservableHashMap k v -> IO a +--modifyHandle f (ObservableHashMap mvar) = modifyMVar mvar f +-- +--modifyHandle_ :: (Handle k v -> IO (Handle k v)) -> ObservableHashMap k v -> IO () +--modifyHandle_ f = modifyHandle (fmap (,()) . f) +-- +--modifyKeyHandle :: (Eq k, Hashable k) => (KeyHandle v -> IO (a, KeyHandle v)) -> k -> ObservableHashMap k v -> IO a +--modifyKeyHandle f k = modifyHandle (updateKeyHandle f k) +-- +--modifyKeyHandle_ :: forall k v. (Eq k, Hashable k) => (KeyHandle v -> IO (KeyHandle v)) -> k -> ObservableHashMap k v -> IO () +--modifyKeyHandle_ f = modifyKeyHandle (fmap ((), ) . f) +-- +--updateKeyHandle :: forall k v a. (Eq k, Hashable k) => (KeyHandle v -> IO (a, KeyHandle v)) -> k -> Handle k v -> IO (Handle k v, a) +--updateKeyHandle f k handle = do +-- (result, keyHandles') <- runExtraT $ HM.alterF updateMaybe k (keyHandles handle) +-- pure (handle {keyHandles = keyHandles'}, result) -- where --- retrieveFn :: IO (Maybe v) --- retrieveFn = liftIO do --- handle <- readMVar mvar --- pure $ join $ fmap value $ HM.lookup key $ keyHandles handle --- observeFn :: ((ObservableMessage (Maybe v) -> IO ()) -> IO Disposable) --- observeFn callback = do --- subscriptionKey <- newUnique --- modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm --- newDisposable (unsubscribe subscriptionKey) --- where --- subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v) --- subscribeFn' subKey keyHandle@KeyHandle{value} = do --- callback $ pure value --- pure $ modifyKeySubscribers (HM.insert subKey callback) keyHandle --- unsubscribe :: Unique -> IO () --- unsubscribe subKey = modifyKeyHandle_ (pure . modifyKeySubscribers (HM.delete subKey)) key ohm - -insert :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> v -> ObservableHashMap k v -> m () -insert key value = liftIO . modifyKeyHandleNotifying_ fn key - where - fn :: KeyHandle v -> IO (Maybe (Delta k v), KeyHandle v) - fn keyHandle@KeyHandle{keySubscribers} = do - mapM_ ($ pure $ Just value) $ HM.elems keySubscribers - pure (Just (Insert key value), keyHandle{value=Just value}) - -delete :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m () -delete key = liftIO . modifyKeyHandleNotifying_ fn key - where - fn :: KeyHandle v -> IO (Maybe (Delta k v), KeyHandle v) - fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do - mapM_ ($ pure $ Nothing) $ HM.elems keySubscribers - let delta = if isJust oldValue then Just (Delete key) else Nothing - pure (delta, keyHandle{value=Nothing}) - -lookup :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m (Maybe v) -lookup key (ObservableHashMap mvar) = liftIO do - Handle{keyHandles} <- readMVar mvar - pure $ value =<< HM.lookup key keyHandles - -lookupDelete :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m (Maybe v) -lookupDelete key = liftIO . modifyKeyHandleNotifying fn key - where - fn :: KeyHandle v -> IO ((Maybe v, Maybe (Delta k v)), KeyHandle v) - fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do - mapM_ ($ pure $ Nothing) $ HM.elems keySubscribers - let delta = if isJust oldValue then Just (Delete key) else Nothing - pure ((oldValue, delta), keyHandle{value=Nothing}) +-- updateMaybe :: Maybe (KeyHandle v) -> ExtraT a IO (Maybe (KeyHandle v)) +-- updateMaybe = fmap toMaybe . (ExtraT . f) . fromMaybe emptyKeyHandle +-- --updateMaybe = undefined +-- emptyKeyHandle :: KeyHandle v +-- emptyKeyHandle = KeyHandle Nothing HM.empty +-- toMaybe :: KeyHandle v -> Maybe (KeyHandle v) +-- toMaybe (KeyHandle Nothing (HM.null -> True)) = Nothing +-- toMaybe keyHandle = Just keyHandle +-- +--modifyKeyHandleNotifying :: (Eq k, Hashable k) => (KeyHandle v -> IO ((a, Maybe (Delta k v)), KeyHandle v)) -> k -> ObservableHashMap k v -> IO a +--modifyKeyHandleNotifying f k = modifyHandle $ \handle -> do +-- (newHandle, (result, delta)) <- updateKeyHandle f k handle +-- notifySubscribers newHandle delta +-- pure (newHandle, result) +-- +--modifyKeyHandleNotifying_ :: (Eq k, Hashable k) => (KeyHandle v -> IO (Maybe (Delta k v), KeyHandle v)) -> k -> ObservableHashMap k v -> IO () +--modifyKeyHandleNotifying_ f k = modifyHandle_ $ \handle -> do +-- (newHandle, delta) <- updateKeyHandle f k handle +-- notifySubscribers newHandle delta +-- pure newHandle +-- +--notifySubscribers :: Handle k v -> Maybe (Delta k v) -> IO () +--notifySubscribers _ Nothing = pure () +--notifySubscribers handle@Handle{deltaSubscribers, subscribers} (Just delta) = do +-- mapM_ ($ delta) $ HM.elems deltaSubscribers +-- mapM_ ($ pure (toHashMap handle)) $ HM.elems subscribers +-- +--modifyKeySubscribers :: (HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) -> HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ())) -> KeyHandle v -> KeyHandle v +--modifyKeySubscribers fn keyHandle = keyHandle {keySubscribers = fn (keySubscribers keyHandle)} +-- +--new :: MonadIO m => m (ObservableHashMap k v) +--new = liftIO $ ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty} +-- +--observeKey :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> Observable (Maybe v) +--observeKey = undefined +----observeKey key ohm@(ObservableHashMap mvar) = synchronousFnObservable observeFn retrieveFn +---- where +---- retrieveFn :: IO (Maybe v) +---- retrieveFn = liftIO do +---- handle <- readMVar mvar +---- pure $ join $ fmap value $ HM.lookup key $ keyHandles handle +---- observeFn :: ((ObservableMessage (Maybe v) -> IO ()) -> IO Disposable) +---- observeFn callback = do +---- subscriptionKey <- newUnique +---- modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm +---- newDisposable (unsubscribe subscriptionKey) +---- where +---- subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v) +---- subscribeFn' subKey keyHandle@KeyHandle{value} = do +---- callback $ pure value +---- pure $ modifyKeySubscribers (HM.insert subKey callback) keyHandle +---- unsubscribe :: Unique -> IO () +---- unsubscribe subKey = modifyKeyHandle_ (pure . modifyKeySubscribers (HM.delete subKey)) key ohm +-- +--insert :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> v -> ObservableHashMap k v -> m () +--insert key value = liftIO . modifyKeyHandleNotifying_ fn key +-- where +-- fn :: KeyHandle v -> IO (Maybe (Delta k v), KeyHandle v) +-- fn keyHandle@KeyHandle{keySubscribers} = do +-- mapM_ ($ pure $ Just value) $ HM.elems keySubscribers +-- pure (Just (Insert key value), keyHandle{value=Just value}) +-- +--delete :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m () +--delete key = liftIO . modifyKeyHandleNotifying_ fn key +-- where +-- fn :: KeyHandle v -> IO (Maybe (Delta k v), KeyHandle v) +-- fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do +-- mapM_ ($ pure $ Nothing) $ HM.elems keySubscribers +-- let delta = if isJust oldValue then Just (Delete key) else Nothing +-- pure (delta, keyHandle{value=Nothing}) +-- +--lookup :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m (Maybe v) +--lookup key (ObservableHashMap mvar) = liftIO do +-- Handle{keyHandles} <- readMVar mvar +-- pure $ value =<< HM.lookup key keyHandles +-- +--lookupDelete :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m (Maybe v) +--lookupDelete key = liftIO . modifyKeyHandleNotifying fn key +-- where +-- fn :: KeyHandle v -> IO ((Maybe v, Maybe (Delta k v)), KeyHandle v) +-- fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do +-- mapM_ ($ pure $ Nothing) $ HM.elems keySubscribers +-- let delta = if isJust oldValue then Just (Delete key) else Nothing +-- pure ((oldValue, delta), keyHandle{value=Nothing}) diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index 25eac77..5fb0e8b 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -1,125 +1,125 @@ module Quasar.Observable.ObservablePriority ( - ObservablePriority, - create, - insertValue, + --ObservablePriority, + --create, + --insertValue, ) where -import Control.Concurrent.STM (atomically) -import Data.HashMap.Strict qualified as HM -import Data.List (maximumBy) -import Data.List.NonEmpty (NonEmpty(..), nonEmpty) -import Data.List.NonEmpty qualified as NonEmpty -import Data.Ord (comparing) -import Quasar.Disposable -import Quasar.Observable -import Quasar.Prelude - -type Entry v = (Unique, v) - --- | Mutable data structure that stores values of type "v" with an assiciated priority "p". The `IsObservable` instance can be used to get or observe the value with the highest priority. -newtype ObservablePriority p v = ObservablePriority (MVar (Internals p v)) - -instance IsRetrievable (Maybe v) (ObservablePriority p v) where - retrieve (ObservablePriority mvar) = liftIO $ pure . getValueFromInternals <$> readMVar mvar - where - getValueFromInternals :: Internals p v -> Maybe v - getValueFromInternals Internals{current=Nothing} = Nothing - getValueFromInternals Internals{current=Just (_, _, value)} = Just value -instance IsObservable (Maybe v) (ObservablePriority p v) where - observe = undefined - --oldObserve (ObservablePriority mvar) callback = do - -- key <- newUnique - -- modifyMVar_ mvar $ \internals@Internals{subscribers} -> do - -- -- Call listener - -- callback (pure (currentValue internals)) - -- pure internals{subscribers = HM.insert key callback subscribers} - -- newDisposable (unsubscribe key) - -- where - -- unsubscribe :: Unique -> IO () - -- unsubscribe key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers} - -type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v)) - -data Internals p v = Internals { - priorityMap :: PriorityMap p v, - current :: Maybe (Unique, p, v), - subscribers :: HM.HashMap Unique (ObservableCallback (Maybe v)) -} - --- | Create a new `ObservablePriority` data structure. -create :: MonadIO m => m (ObservablePriority p v) -create = liftIO do - ObservablePriority <$> newMVar Internals { - priorityMap = HM.empty, - current = Nothing, - subscribers = HM.empty - } - -currentValue :: Internals k v -> Maybe v -currentValue Internals{current} = (\(_, _, value) -> value) <$> current - --- | Insert a value with an assigned priority into the data structure. If the priority is higher than the current highest priority the value will become the current value (and will be sent to subscribers). Otherwise the value will be stored and will only become the current value when all values with a higher priority and all values with the same priority that have been inserted earlier have been removed. --- Returns an `Disposable` that can be used to remove the value from the data structure. -insertValue :: forall p v m. MonadIO m => (Ord p, Hashable p) => ObservablePriority p v -> p -> v -> m Disposable -insertValue (ObservablePriority mvar) priority value = liftIO $ modifyMVar mvar $ \internals -> do - key <- newUnique - newInternals <- insertValue' key internals - (newInternals,) <$> atomically (newDisposable (removeValue key)) - where - insertValue' :: Unique -> Internals p v -> IO (Internals p v) - insertValue' key internals@Internals{priorityMap, current} - | hasToUpdateCurrent current = do - let newInternals = internals{priorityMap=insertEntry priorityMap, current=Just (key, priority, value)} - notifySubscribers newInternals - pure newInternals - | otherwise = pure internals{priorityMap=insertEntry priorityMap} - where - insertEntry :: PriorityMap p v -> PriorityMap p v - insertEntry = HM.alter addToEntryList priority - addToEntryList :: Maybe (NonEmpty (Entry v)) -> Maybe (NonEmpty (Entry v)) - addToEntryList Nothing = Just newEntryList - addToEntryList (Just list) = Just (list <> newEntryList) - newEntryList :: NonEmpty (Entry v) - newEntryList = (key, value) :| [] - - hasToUpdateCurrent :: (Maybe (Unique, p, v)) -> Bool - hasToUpdateCurrent Nothing = True - hasToUpdateCurrent (Just (_, oldPriority, _)) = priority > oldPriority - - removeValue :: Unique -> IO () - removeValue key = modifyMVar_ mvar removeValue' - where - removeValue' :: Internals p v -> IO (Internals p v) - removeValue' internals@Internals{priorityMap, current} = do - let newInternals = internals{priorityMap = removeEntry priorityMap} - if hasToUpdateCurrent current - then updateCurrent newInternals - else pure newInternals - - removeEntry :: PriorityMap p v -> PriorityMap p v - removeEntry = HM.alter removeEntryFromList priority - removeEntryFromList :: Maybe (NonEmpty (Entry v)) -> Maybe (NonEmpty (Entry v)) - removeEntryFromList Nothing = Nothing - removeEntryFromList (Just list) = nonEmpty $ NonEmpty.filter (\(key', _) -> key' /= key) list - - updateCurrent :: Internals p v -> IO (Internals p v) - updateCurrent internals@Internals{priorityMap} = do - let newInternals = internals{current = selectCurrent $ HM.toList priorityMap} - notifySubscribers newInternals - pure newInternals - selectCurrent :: [(p, (NonEmpty (Entry v)))] -> Maybe (Unique, p, v) - selectCurrent [] = Nothing - selectCurrent list = Just . selectCurrentFromList . maximumBy (comparing fst) $ list - where - selectCurrentFromList :: (p, (NonEmpty (Entry v))) -> (Unique, p, v) - selectCurrentFromList (priority', entryList) = (key', priority', value') - where - (key', value') = NonEmpty.head entryList - - hasToUpdateCurrent :: (Maybe (Unique, p, v)) -> Bool - hasToUpdateCurrent Nothing = False - hasToUpdateCurrent (Just (oldKey, _, _)) = key == oldKey - - -notifySubscribers :: forall p v. Internals p v -> IO () -notifySubscribers Internals{subscribers, current} = forM_ subscribers (\callback -> callback (pure ((\(_, _, value) -> value) <$> current))) +--import Control.Concurrent.STM (atomically) +--import Data.HashMap.Strict qualified as HM +--import Data.List (maximumBy) +--import Data.List.NonEmpty (NonEmpty(..), nonEmpty) +--import Data.List.NonEmpty qualified as NonEmpty +--import Data.Ord (comparing) +--import Quasar.Disposable +--import Quasar.Observable +--import Quasar.Prelude +-- +--type Entry v = (Unique, v) +-- +---- | Mutable data structure that stores values of type "v" with an assiciated priority "p". The `IsObservable` instance can be used to get or observe the value with the highest priority. +--newtype ObservablePriority p v = ObservablePriority (MVar (Internals p v)) +-- +--instance IsRetrievable (Maybe v) (ObservablePriority p v) where +-- retrieve (ObservablePriority mvar) = liftIO $ pure . getValueFromInternals <$> readMVar mvar +-- where +-- getValueFromInternals :: Internals p v -> Maybe v +-- getValueFromInternals Internals{current=Nothing} = Nothing +-- getValueFromInternals Internals{current=Just (_, _, value)} = Just value +--instance IsObservable (Maybe v) (ObservablePriority p v) where +-- observe = undefined +-- --oldObserve (ObservablePriority mvar) callback = do +-- -- key <- newUnique +-- -- modifyMVar_ mvar $ \internals@Internals{subscribers} -> do +-- -- -- Call listener +-- -- callback (pure (currentValue internals)) +-- -- pure internals{subscribers = HM.insert key callback subscribers} +-- -- newDisposable (unsubscribe key) +-- -- where +-- -- unsubscribe :: Unique -> IO () +-- -- unsubscribe key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers} +-- +--type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v)) +-- +--data Internals p v = Internals { +-- priorityMap :: PriorityMap p v, +-- current :: Maybe (Unique, p, v), +-- subscribers :: HM.HashMap Unique (ObservableCallback (Maybe v)) +--} +-- +---- | Create a new `ObservablePriority` data structure. +--create :: MonadIO m => m (ObservablePriority p v) +--create = liftIO do +-- ObservablePriority <$> newMVar Internals { +-- priorityMap = HM.empty, +-- current = Nothing, +-- subscribers = HM.empty +-- } +-- +--currentValue :: Internals k v -> Maybe v +--currentValue Internals{current} = (\(_, _, value) -> value) <$> current +-- +---- | Insert a value with an assigned priority into the data structure. If the priority is higher than the current highest priority the value will become the current value (and will be sent to subscribers). Otherwise the value will be stored and will only become the current value when all values with a higher priority and all values with the same priority that have been inserted earlier have been removed. +---- Returns an `Disposable` that can be used to remove the value from the data structure. +--insertValue :: forall p v m. MonadIO m => (Ord p, Hashable p) => ObservablePriority p v -> p -> v -> m Disposable +--insertValue (ObservablePriority mvar) priority value = liftIO $ modifyMVar mvar $ \internals -> do +-- key <- newUnique +-- newInternals <- insertValue' key internals +-- (newInternals,) <$> atomically (newDisposable (removeValue key)) +-- where +-- insertValue' :: Unique -> Internals p v -> IO (Internals p v) +-- insertValue' key internals@Internals{priorityMap, current} +-- | hasToUpdateCurrent current = do +-- let newInternals = internals{priorityMap=insertEntry priorityMap, current=Just (key, priority, value)} +-- notifySubscribers newInternals +-- pure newInternals +-- | otherwise = pure internals{priorityMap=insertEntry priorityMap} +-- where +-- insertEntry :: PriorityMap p v -> PriorityMap p v +-- insertEntry = HM.alter addToEntryList priority +-- addToEntryList :: Maybe (NonEmpty (Entry v)) -> Maybe (NonEmpty (Entry v)) +-- addToEntryList Nothing = Just newEntryList +-- addToEntryList (Just list) = Just (list <> newEntryList) +-- newEntryList :: NonEmpty (Entry v) +-- newEntryList = (key, value) :| [] +-- +-- hasToUpdateCurrent :: (Maybe (Unique, p, v)) -> Bool +-- hasToUpdateCurrent Nothing = True +-- hasToUpdateCurrent (Just (_, oldPriority, _)) = priority > oldPriority +-- +-- removeValue :: Unique -> IO () +-- removeValue key = modifyMVar_ mvar removeValue' +-- where +-- removeValue' :: Internals p v -> IO (Internals p v) +-- removeValue' internals@Internals{priorityMap, current} = do +-- let newInternals = internals{priorityMap = removeEntry priorityMap} +-- if hasToUpdateCurrent current +-- then updateCurrent newInternals +-- else pure newInternals +-- +-- removeEntry :: PriorityMap p v -> PriorityMap p v +-- removeEntry = HM.alter removeEntryFromList priority +-- removeEntryFromList :: Maybe (NonEmpty (Entry v)) -> Maybe (NonEmpty (Entry v)) +-- removeEntryFromList Nothing = Nothing +-- removeEntryFromList (Just list) = nonEmpty $ NonEmpty.filter (\(key', _) -> key' /= key) list +-- +-- updateCurrent :: Internals p v -> IO (Internals p v) +-- updateCurrent internals@Internals{priorityMap} = do +-- let newInternals = internals{current = selectCurrent $ HM.toList priorityMap} +-- notifySubscribers newInternals +-- pure newInternals +-- selectCurrent :: [(p, (NonEmpty (Entry v)))] -> Maybe (Unique, p, v) +-- selectCurrent [] = Nothing +-- selectCurrent list = Just . selectCurrentFromList . maximumBy (comparing fst) $ list +-- where +-- selectCurrentFromList :: (p, (NonEmpty (Entry v))) -> (Unique, p, v) +-- selectCurrentFromList (priority', entryList) = (key', priority', value') +-- where +-- (key', value') = NonEmpty.head entryList +-- +-- hasToUpdateCurrent :: (Maybe (Unique, p, v)) -> Bool +-- hasToUpdateCurrent Nothing = False +-- hasToUpdateCurrent (Just (oldKey, _, _)) = key == oldKey +-- +-- +--notifySubscribers :: forall p v. Internals p v -> IO () +--notifySubscribers Internals{subscribers, current} = forM_ subscribers (\callback -> callback (pure ((\(_, _, value) -> value) <$> current))) diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index 4181735..f3a5f35 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -19,122 +19,123 @@ shouldReturnM action expected = do liftIO $ result `shouldBe` expected spec :: Spec -spec = parallel $ do - describe "retrieve" $ do - it "returns the contents of the map" $ io $ withRootResourceManager do - om :: OM.ObservableHashMap String String <- OM.new - (retrieve om >>= await) `shouldReturnM` HM.empty - -- Evaluate unit for coverage - () <- OM.insert "key" "value" om - (retrieve om >>= await) `shouldReturnM` HM.singleton "key" "value" - OM.insert "key2" "value2" om - (retrieve om >>= await) `shouldReturnM` HM.fromList [("key", "value"), ("key2", "value2")] - - describe "subscribe" $ do - xit "calls the callback with the contents of the map" $ io $ withRootResourceManager do - lastCallbackValue <- liftIO $ newIORef unreachableCodePath - - om :: OM.ObservableHashMap String String <- OM.new - subscriptionHandle <- captureDisposable_ $ observe om $ liftIO . writeIORef lastCallbackValue - let lastCallbackShouldBe expected = liftIO do - (ObservableUpdate update) <- readIORef lastCallbackValue - update `shouldBe` expected - - lastCallbackShouldBe HM.empty - OM.insert "key" "value" om - lastCallbackShouldBe (HM.singleton "key" "value") - OM.insert "key2" "value2" om - lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) - - dispose subscriptionHandle - lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) - - OM.insert "key3" "value3" om - lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) - - describe "subscribeDelta" $ do - it "calls the callback with changes to the map" $ io $ withRootResourceManager do - lastDelta <- liftIO $ newIORef unreachableCodePath - - om :: OM.ObservableHashMap String String <- OM.new - subscriptionHandle <- subscribeDelta om $ writeIORef lastDelta - let lastDeltaShouldBe = liftIO . (readIORef lastDelta `shouldReturn`) - - lastDeltaShouldBe $ Reset HM.empty - OM.insert "key" "value" om - lastDeltaShouldBe $ Insert "key" "value" - OM.insert "key" "changed" om - lastDeltaShouldBe $ Insert "key" "changed" - OM.insert "key2" "value2" om - lastDeltaShouldBe $ Insert "key2" "value2" - - dispose subscriptionHandle - lastDeltaShouldBe $ Insert "key2" "value2" - - OM.insert "key3" "value3" om - lastDeltaShouldBe $ Insert "key2" "value2" - - void $ subscribeDelta om $ writeIORef lastDelta - lastDeltaShouldBe $ Reset $ HM.fromList [("key", "changed"), ("key2", "value2"), ("key3", "value3")] - - OM.delete "key2" om - lastDeltaShouldBe $ Delete "key2" - - OM.lookupDelete "key" om `shouldReturnM` Just "changed" - lastDeltaShouldBe $ Delete "key" - - (retrieve om >>= await) `shouldReturnM` HM.singleton "key3" "value3" - - describe "observeKey" $ do - xit "calls key callbacks with the correct value" $ io $ withRootResourceManager do - value1 <- liftIO $ newIORef undefined - value2 <- liftIO $ newIORef undefined - - om :: OM.ObservableHashMap String String <- OM.new - - void $ observe (OM.observeKey "key1" om) (liftIO . writeIORef value1) - let v1ShouldBe expected = liftIO do - (ObservableUpdate update) <- readIORef value1 - update `shouldBe` expected - - v1ShouldBe $ Nothing - - OM.insert "key1" "value1" om - v1ShouldBe $ Just "value1" - - OM.insert "key2" "value2" om - v1ShouldBe $ Just "value1" - - handle2 <- captureDisposable_ $ observe (OM.observeKey "key2" om) (liftIO . writeIORef value2) - let v2ShouldBe expected = liftIO do - (ObservableUpdate update) <- readIORef value2 - update `shouldBe` expected - - v1ShouldBe $ Just "value1" - v2ShouldBe $ Just "value2" - - OM.insert "key2" "changed" om - v1ShouldBe $ Just "value1" - v2ShouldBe $ Just "changed" - - OM.delete "key1" om - v1ShouldBe $ Nothing - v2ShouldBe $ Just "changed" - - -- Delete again (should have no effect) - OM.delete "key1" om - v1ShouldBe $ Nothing - v2ShouldBe $ Just "changed" - - (retrieve om >>= await) `shouldReturnM` HM.singleton "key2" "changed" - dispose handle2 - - OM.lookupDelete "key2" om `shouldReturnM` Just "changed" - v2ShouldBe $ Just "changed" - - OM.lookupDelete "key2" om `shouldReturnM` Nothing - - OM.lookupDelete "key1" om `shouldReturnM` Nothing - v1ShouldBe $ Nothing - - (retrieve om >>= await) `shouldReturnM` HM.empty +spec = pure () +--spec = parallel $ do +-- describe "retrieve" $ do +-- it "returns the contents of the map" $ io $ withRootResourceManager do +-- om :: OM.ObservableHashMap String String <- OM.new +-- (retrieve om >>= await) `shouldReturnM` HM.empty +-- -- Evaluate unit for coverage +-- () <- OM.insert "key" "value" om +-- (retrieve om >>= await) `shouldReturnM` HM.singleton "key" "value" +-- OM.insert "key2" "value2" om +-- (retrieve om >>= await) `shouldReturnM` HM.fromList [("key", "value"), ("key2", "value2")] +-- +-- describe "subscribe" $ do +-- xit "calls the callback with the contents of the map" $ io $ withRootResourceManager do +-- lastCallbackValue <- liftIO $ newIORef unreachableCodePath +-- +-- om :: OM.ObservableHashMap String String <- OM.new +-- subscriptionHandle <- captureDisposable_ $ observe om $ liftIO . writeIORef lastCallbackValue +-- let lastCallbackShouldBe expected = liftIO do +-- (ObservableUpdate update) <- readIORef lastCallbackValue +-- update `shouldBe` expected +-- +-- lastCallbackShouldBe HM.empty +-- OM.insert "key" "value" om +-- lastCallbackShouldBe (HM.singleton "key" "value") +-- OM.insert "key2" "value2" om +-- lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) +-- +-- dispose subscriptionHandle +-- lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) +-- +-- OM.insert "key3" "value3" om +-- lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) +-- +-- describe "subscribeDelta" $ do +-- it "calls the callback with changes to the map" $ io $ withRootResourceManager do +-- lastDelta <- liftIO $ newIORef unreachableCodePath +-- +-- om :: OM.ObservableHashMap String String <- OM.new +-- subscriptionHandle <- subscribeDelta om $ writeIORef lastDelta +-- let lastDeltaShouldBe = liftIO . (readIORef lastDelta `shouldReturn`) +-- +-- lastDeltaShouldBe $ Reset HM.empty +-- OM.insert "key" "value" om +-- lastDeltaShouldBe $ Insert "key" "value" +-- OM.insert "key" "changed" om +-- lastDeltaShouldBe $ Insert "key" "changed" +-- OM.insert "key2" "value2" om +-- lastDeltaShouldBe $ Insert "key2" "value2" +-- +-- dispose subscriptionHandle +-- lastDeltaShouldBe $ Insert "key2" "value2" +-- +-- OM.insert "key3" "value3" om +-- lastDeltaShouldBe $ Insert "key2" "value2" +-- +-- void $ subscribeDelta om $ writeIORef lastDelta +-- lastDeltaShouldBe $ Reset $ HM.fromList [("key", "changed"), ("key2", "value2"), ("key3", "value3")] +-- +-- OM.delete "key2" om +-- lastDeltaShouldBe $ Delete "key2" +-- +-- OM.lookupDelete "key" om `shouldReturnM` Just "changed" +-- lastDeltaShouldBe $ Delete "key" +-- +-- (retrieve om >>= await) `shouldReturnM` HM.singleton "key3" "value3" +-- +-- describe "observeKey" $ do +-- xit "calls key callbacks with the correct value" $ io $ withRootResourceManager do +-- value1 <- liftIO $ newIORef undefined +-- value2 <- liftIO $ newIORef undefined +-- +-- om :: OM.ObservableHashMap String String <- OM.new +-- +-- void $ observe (OM.observeKey "key1" om) (liftIO . writeIORef value1) +-- let v1ShouldBe expected = liftIO do +-- (ObservableUpdate update) <- readIORef value1 +-- update `shouldBe` expected +-- +-- v1ShouldBe $ Nothing +-- +-- OM.insert "key1" "value1" om +-- v1ShouldBe $ Just "value1" +-- +-- OM.insert "key2" "value2" om +-- v1ShouldBe $ Just "value1" +-- +-- handle2 <- captureDisposable_ $ observe (OM.observeKey "key2" om) (liftIO . writeIORef value2) +-- let v2ShouldBe expected = liftIO do +-- (ObservableUpdate update) <- readIORef value2 +-- update `shouldBe` expected +-- +-- v1ShouldBe $ Just "value1" +-- v2ShouldBe $ Just "value2" +-- +-- OM.insert "key2" "changed" om +-- v1ShouldBe $ Just "value1" +-- v2ShouldBe $ Just "changed" +-- +-- OM.delete "key1" om +-- v1ShouldBe $ Nothing +-- v2ShouldBe $ Just "changed" +-- +-- -- Delete again (should have no effect) +-- OM.delete "key1" om +-- v1ShouldBe $ Nothing +-- v2ShouldBe $ Just "changed" +-- +-- (retrieve om >>= await) `shouldReturnM` HM.singleton "key2" "changed" +-- dispose handle2 +-- +-- OM.lookupDelete "key2" om `shouldReturnM` Just "changed" +-- v2ShouldBe $ Just "changed" +-- +-- OM.lookupDelete "key2" om `shouldReturnM` Nothing +-- +-- OM.lookupDelete "key1" om `shouldReturnM` Nothing +-- v1ShouldBe $ Nothing +-- +-- (retrieve om >>= await) `shouldReturnM` HM.empty diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index a572b2b..e6456d0 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -1,13 +1,13 @@ module Quasar.Observable.ObservablePrioritySpec (spec) where -import Control.Monad (void) -import Data.IORef -import Quasar.Awaitable -import Quasar.Disposable -import Quasar.Observable -import Quasar.Observable.ObservablePriority (ObservablePriority) -import Quasar.Observable.ObservablePriority qualified as OP -import Quasar.ResourceManager +--import Control.Monad (void) +--import Data.IORef +--import Quasar.Awaitable +--import Quasar.Disposable +--import Quasar.Observable +--import Quasar.Observable.ObservablePriority (ObservablePriority) +--import Quasar.Observable.ObservablePriority qualified as OP +--import Quasar.ResourceManager import Quasar.Prelude import Test.Hspec @@ -19,37 +19,38 @@ shouldReturnM action expected = do spec :: Spec -spec = do - describe "ObservablePriority" $ parallel $ do - it "can be created" $ io do - void $ OP.create - specify "retrieveIO returns the value with the highest priority" $ io $ withRootResourceManager do - op :: ObservablePriority Int String <- OP.create - p2 <- OP.insertValue op 2 "p2" - (retrieve op >>= await) `shouldReturnM` Just "p2" - p1 <- OP.insertValue op 1 "p1" - (retrieve op >>= await) `shouldReturnM` Just "p2" - dispose p2 - (retrieve op >>= await) `shouldReturnM` Just "p1" - dispose p1 - (retrieve op >>= await) `shouldReturnM` Nothing - xit "sends updates when its value changes" $ io $ withRootResourceManager do - result <- liftIO $ newIORef [] - let mostRecentShouldBe expected = liftIO do - (ObservableUpdate x) <- (head <$> readIORef result) - x `shouldBe` expected - - op :: ObservablePriority Int String <- OP.create - _s <- observe op (liftIO . modifyIORef result . (:)) - mostRecentShouldBe Nothing - p2 <- OP.insertValue op 2 "p2" - - mostRecentShouldBe (Just "p2") - p1 <- OP.insertValue op 1 "p1" - mostRecentShouldBe (Just "p2") - dispose p2 - mostRecentShouldBe (Just "p1") - dispose p1 - mostRecentShouldBe Nothing - - liftIO $ length <$> readIORef result `shouldReturn` 4 +spec = pure () +--spec = do +-- describe "ObservablePriority" $ parallel $ do +-- it "can be created" $ io do +-- void $ OP.create +-- specify "retrieveIO returns the value with the highest priority" $ io $ withRootResourceManager do +-- op :: ObservablePriority Int String <- OP.create +-- p2 <- OP.insertValue op 2 "p2" +-- (retrieve op >>= await) `shouldReturnM` Just "p2" +-- p1 <- OP.insertValue op 1 "p1" +-- (retrieve op >>= await) `shouldReturnM` Just "p2" +-- dispose p2 +-- (retrieve op >>= await) `shouldReturnM` Just "p1" +-- dispose p1 +-- (retrieve op >>= await) `shouldReturnM` Nothing +-- xit "sends updates when its value changes" $ io $ withRootResourceManager do +-- result <- liftIO $ newIORef [] +-- let mostRecentShouldBe expected = liftIO do +-- (ObservableUpdate x) <- (head <$> readIORef result) +-- x `shouldBe` expected +-- +-- op :: ObservablePriority Int String <- OP.create +-- _s <- observe op (liftIO . modifyIORef result . (:)) +-- mostRecentShouldBe Nothing +-- p2 <- OP.insertValue op 2 "p2" +-- +-- mostRecentShouldBe (Just "p2") +-- p1 <- OP.insertValue op 1 "p1" +-- mostRecentShouldBe (Just "p2") +-- dispose p2 +-- mostRecentShouldBe (Just "p1") +-- dispose p1 +-- mostRecentShouldBe Nothing +-- +-- liftIO $ length <$> readIORef result `shouldReturn` 4 diff --git a/test/Quasar/ObservableSpec.hs b/test/Quasar/ObservableSpec.hs index 0df6d1d..97c0a95 100644 --- a/test/Quasar/ObservableSpec.hs +++ b/test/Quasar/ObservableSpec.hs @@ -9,62 +9,63 @@ import Test.Hspec spec :: Spec -spec = do - observableSpec - mergeObservableSpec - -observableSpec :: Spec -observableSpec = parallel do - describe "Observable" do - it "works" $ io do - shouldReturn - do - withRootResourceManager do - observeWhile (pure () :: Observable ()) toObservableUpdate - () - - -mergeObservableSpec :: Spec -mergeObservableSpec = do - describe "mergeObservable" $ parallel $ do - it "merges correctly using retrieveIO" $ io $ withRootResourceManager do - a <- newObservableVar "" - b <- newObservableVar "" - - let mergedObservable = liftA2 (,) (toObservable a) (toObservable b) - let latestShouldBe val = retrieve mergedObservable >>= await >>= liftIO . (`shouldBe` val) - - testSequence a b latestShouldBe - - it "merges correctly using observe" $ io $ withRootResourceManager do - a <- newObservableVar "" - b <- newObservableVar "" - - let mergedObservable = liftA2 (,) (toObservable a) (toObservable b) - (latestRef :: IORef (ObservableMessage (String, String))) <- liftIO $ newIORef (ObservableUpdate ("", "")) - void $ observe mergedObservable (liftIO . writeIORef latestRef) - let latestShouldBe expected = liftIO do - (ObservableUpdate x) <- readIORef latestRef - x `shouldBe` expected - - testSequence a b latestShouldBe - where - testSequence :: ObservableVar String -> ObservableVar String -> ((String, String) -> ResourceManagerIO ()) -> ResourceManagerIO () - testSequence a b latestShouldBe = do - latestShouldBe ("", "") - - setObservableVar a "a0" - latestShouldBe ("a0", "") - - setObservableVar b "b0" - latestShouldBe ("a0", "b0") - - setObservableVar a "a1" - latestShouldBe ("a1", "b0") - - setObservableVar b "b1" - latestShouldBe ("a1", "b1") - - -- No change - setObservableVar a "a1" - latestShouldBe ("a1", "b1") +spec = pure () +--spec = do +-- observableSpec +-- mergeObservableSpec +-- +--observableSpec :: Spec +--observableSpec = parallel do +-- describe "Observable" do +-- it "works" $ io do +-- shouldReturn +-- do +-- withRootResourceManager do +-- observeWhile (pure () :: Observable ()) toObservableUpdate +-- () +-- +-- +--mergeObservableSpec :: Spec +--mergeObservableSpec = do +-- describe "mergeObservable" $ parallel $ do +-- it "merges correctly using retrieveIO" $ io $ withRootResourceManager do +-- a <- newObservableVar "" +-- b <- newObservableVar "" +-- +-- let mergedObservable = liftA2 (,) (toObservable a) (toObservable b) +-- let latestShouldBe val = retrieve mergedObservable >>= await >>= liftIO . (`shouldBe` val) +-- +-- testSequence a b latestShouldBe +-- +-- it "merges correctly using observe" $ io $ withRootResourceManager do +-- a <- newObservableVar "" +-- b <- newObservableVar "" +-- +-- let mergedObservable = liftA2 (,) (toObservable a) (toObservable b) +-- (latestRef :: IORef (ObservableMessage (String, String))) <- liftIO $ newIORef (ObservableUpdate ("", "")) +-- void $ observe mergedObservable (liftIO . writeIORef latestRef) +-- let latestShouldBe expected = liftIO do +-- (ObservableUpdate x) <- readIORef latestRef +-- x `shouldBe` expected +-- +-- testSequence a b latestShouldBe +-- where +-- testSequence :: ObservableVar String -> ObservableVar String -> ((String, String) -> ResourceManagerIO ()) -> ResourceManagerIO () +-- testSequence a b latestShouldBe = do +-- latestShouldBe ("", "") +-- +-- setObservableVar a "a0" +-- latestShouldBe ("a0", "") +-- +-- setObservableVar b "b0" +-- latestShouldBe ("a0", "b0") +-- +-- setObservableVar a "a1" +-- latestShouldBe ("a1", "b0") +-- +-- setObservableVar b "b1" +-- latestShouldBe ("a1", "b1") +-- +-- -- No change +-- setObservableVar a "a1" +-- latestShouldBe ("a1", "b1") -- GitLab