diff --git a/quasar.cabal b/quasar.cabal index a339ef314020b05c12b0ec22fe0bbaf51b662de0..8859d8fcf82e236d29ad20e1ece9469a584976e3 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -64,6 +64,7 @@ common shared-executable-properties library import: shared-properties exposed-modules: + Quasar.Core Quasar.Observable Quasar.Observable.Delta Quasar.Observable.ObservableHashMap diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs new file mode 100644 index 0000000000000000000000000000000000000000..ce8636a03a9309eecce7bdd1f02bc3a0c085a9d4 --- /dev/null +++ b/src/Quasar/Core.hs @@ -0,0 +1,286 @@ +module Quasar.Core ( + IsAsync(..), + Async, + cacheAsync, + AsyncIO, + async, + await, + runAsyncIO, + startAsyncIO, + AsyncVar, + newAsyncVar, + putAsyncVar, + IsDisposable(..), + Disposable, + mkDisposable, + synchronousDisposable, + dummyDisposable, +) where + +import qualified Data.HashMap.Strict as HM +import Quasar.Prelude + +-- * Async + +class IsAsync r a | a -> r where + -- | Wait until the promise is settled and return the result. + wait :: a -> IO r + wait promise = do + mvar <- newEmptyMVar + onResult_ promise (callback mvar) + readMVar mvar + where + callback :: MVar r -> r -> IO () + callback mvar result = do + success <- tryPutMVar mvar result + unless success $ fail "Callback was called multiple times" + + -- | Register a callback, that will be called once the promise is settled. + -- If the promise is already settled, the callback will be called immediately instead. + -- + -- The returned `Disposable` can be used to deregister the callback. + onResult + :: a + -- ^ async + -> (r -> IO ()) + -- ^ callback + -> IO Disposable + + onResult_ :: a -> (r -> IO ()) -> IO () + onResult_ x = void . onResult x + + toAsync :: a -> Async r + toAsync = SomeAsync + + +data Async r + = forall a. IsAsync r a => SomeAsync a + -- '| forall a. Async ((r -> IO ()) -> IO Disposable) + | CompletedAsync r + | forall a. MappedAsync (a -> r) (Async a) + | forall a. BindAsync (Async a) (a -> Async r) + + +instance IsAsync r (Async r) where + onResult :: Async r -> (r -> IO ()) -> IO Disposable + onResult (SomeAsync promise) callback = onResult promise callback + onResult (CompletedAsync result) callback = DummyDisposable <$ callback result + onResult (MappedAsync fn promise) callback = onResult promise $ callback . fn + onResult (BindAsync px fn) callback = do + (disposableMVar :: MVar (Maybe Disposable)) <- newEmptyMVar + d1 <- onResult px $ \x -> + modifyMVar_ disposableMVar $ \case + -- Already disposed + Nothing -> pure Nothing + Just _ -> do + d2 <- onResult (fn x) callback + pure $ Just d2 + putMVar disposableMVar $ Just d1 + pure $ mkDisposable $ do + currentDisposable <- liftIO $ readMVar disposableMVar + dispose currentDisposable + onResult_ :: Async r -> (r -> IO ()) -> IO () + onResult_ (SomeAsync promise) callback = onResult_ promise callback + onResult_ (CompletedAsync result) callback = callback result + onResult_ (MappedAsync fn promise) callback = onResult_ promise $ callback . fn + onResult_ (BindAsync px fn) callback = onResult_ px $ \x -> onResult_ (fn x) callback + toAsync = id + +instance Functor Async where + fmap fn promise = MappedAsync fn promise + +instance Applicative Async where + pure = CompletedAsync + (<*>) pf px = pf >>= \f -> f <$> px + liftA2 f px py = px >>= \x -> f x <$> py + +instance Monad Async where + (>>=) = BindAsync + + +instance Semigroup r => Semigroup (Async r) where + (<>) = liftA2 (<>) + +instance Monoid r => Monoid (Async r) where + mempty = pure mempty + mconcat = fmap mconcat . sequence + + + +newtype AsyncIO r = AsyncIO (IO (Async r)) + +instance Functor AsyncIO where + fmap f (AsyncIO x) = AsyncIO (f <<$>> x) +instance Applicative AsyncIO where + pure = AsyncIO . pure . pure + (<*>) pf px = pf >>= \f -> f <$> px + liftA2 f px py = px >>= \x -> f x <$> py +instance Monad AsyncIO where + lhs >>= fn = AsyncIO $ do + resultVar <- newAsyncVar + lhsAsync <- startAsyncIO lhs + lhsAsync `onResult_` \lhsResult -> do + rhsAsync <- startAsyncIO $ fn lhsResult + rhsAsync `onResult_` putAsyncVar resultVar + pure $ toAsync resultVar + +instance MonadIO AsyncIO where + liftIO = AsyncIO . fmap pure + + +async :: AsyncIO r -> AsyncIO (Async r) +async = liftIO . startAsyncIO + +await :: IsAsync r a => a -> AsyncIO r +await = AsyncIO . pure . toAsync + +-- | Run an `AsyncIO` to completion and return the result. +runAsyncIO :: AsyncIO r -> IO r +runAsyncIO = wait <=< startAsyncIO + + +-- | Run the synchronous part of an `AsyncIO`. Returns an `Async` that can be used to wait for completion of the operation. +startAsyncIO :: AsyncIO r -> IO (Async r) +startAsyncIO (AsyncIO x) = x + +-- ** Forking asyncs + +--class IsAsyncForkable m where +-- asyncThread :: m r -> AsyncIO r + + +-- ** Async implementation + +-- | The default implementation for a `Async` that can be fulfilled later. +data AsyncVar r = AsyncVar (MVar r) (MVar (Maybe (HM.HashMap Unique (r -> IO ())))) + +instance IsAsync r (AsyncVar r) where + wait :: AsyncVar r -> IO r + wait (AsyncVar valueMVar _) = readMVar valueMVar + onResult :: AsyncVar r -> (r -> IO ()) -> IO Disposable + onResult (AsyncVar valueMVar callbackMVar) callback = + modifyMVar callbackMVar $ \case + Just callbacks -> do + key <- newUnique + pure (Just (HM.insert key callback callbacks), removeHandler key) + Nothing -> (Nothing, DummyDisposable) <$ (callback =<< readMVar valueMVar) + where + removeHandler :: Unique -> Disposable + removeHandler key = synchronousDisposable $ modifyMVar_ callbackMVar $ pure . fmap (HM.delete key) + + +newAsyncVar :: IO (AsyncVar r) +newAsyncVar = do + valueMVar <- newEmptyMVar + callbackMVar <- newMVar $ Just HM.empty + pure $ AsyncVar valueMVar callbackMVar + +putAsyncVar :: AsyncVar a -> a -> IO () +putAsyncVar (AsyncVar valueMVar callbackMVar) value = do + success <- tryPutMVar valueMVar value + unless success $ fail "A promise can only be fulfilled once" + putMVar valueMVar value + callbacks <- modifyMVar callbackMVar (pure . (Nothing, ) . concatMap HM.elems) + mapM_ ($ value) callbacks + + +-- ** Async cache + +data CachedAsyncState r = CacheNoCallbacks | CacheHasCallbacks Disposable (HM.HashMap Unique (r -> IO ())) | CacheSettled r +data CachedAsync r = CachedAsync (Async r) (MVar (CachedAsyncState r)) + +instance IsAsync r (CachedAsync r) where + onResult (CachedAsync baseAsync stateMVar) callback = + modifyMVar stateMVar $ \case + CacheNoCallbacks -> do + key <- newUnique + disp <- onResult baseAsync fulfillCache + pure (CacheHasCallbacks disp (HM.singleton key callback), removeHandler key) + CacheHasCallbacks disp callbacks -> do + key <- newUnique + pure (CacheHasCallbacks disp (HM.insert key callback callbacks), removeHandler key) + x@(CacheSettled value) -> (x, DummyDisposable) <$ callback value + where + removeHandler :: Unique -> Disposable + removeHandler key = synchronousDisposable $ modifyMVar_ stateMVar $ \case + CacheHasCallbacks disp callbacks -> do + let newCallbacks = HM.delete key callbacks + if HM.null newCallbacks + then CacheNoCallbacks <$ disposeBlocking disp + else pure (CacheHasCallbacks disp newCallbacks) + state -> pure state + fulfillCache :: r -> IO () + fulfillCache value = do + callbacks <- modifyMVar stateMVar $ \case + CacheHasCallbacks _ callbacks -> pure (CacheSettled value, HM.elems callbacks) + CacheNoCallbacks -> pure (CacheSettled value, []) + CacheSettled _ -> fail "Callback was called multiple times" + mapM_ ($ value) callbacks + +cacheAsync :: IsAsync r p => p -> IO (Async r) +cacheAsync promise = toAsync . CachedAsync (toAsync promise) <$> newMVar CacheNoCallbacks + +-- * Disposable + +class IsDisposable a where + -- TODO document laws: must not throw exceptions, is idempotent + + -- | Dispose a resource. When the resource has been released the promise is fulfilled. + dispose :: a -> AsyncIO () + dispose = liftIO . disposeBlocking + -- | Dispose a resource. Returns without waiting for the resource to be released. + disposeEventually :: a -> IO () + disposeEventually = void . startAsyncIO . dispose + -- | Dispose a resource. Blocks until the resource is released. + disposeBlocking :: a -> IO () + disposeBlocking = runAsyncIO . dispose + + toDisposable :: a -> Disposable + toDisposable = SomeDisposable + + {-# MINIMAL dispose | disposeBlocking #-} + +instance IsDisposable a => IsDisposable (Maybe a) where + dispose = mapM_ dispose + disposeEventually = mapM_ disposeEventually + disposeBlocking = mapM_ disposeBlocking + + +data Disposable + = forall a. IsDisposable a => SomeDisposable a + | Disposable (AsyncIO ()) + | MultiDisposable [Disposable] + | DummyDisposable + +instance IsDisposable Disposable where + dispose (SomeDisposable x) = dispose x + dispose (Disposable fn) = fn + dispose (MultiDisposable disposables) = mconcat <$> mapM dispose disposables + dispose DummyDisposable = pure () + + disposeEventually (SomeDisposable x) = disposeEventually x + disposeEventually x = void . startAsyncIO . dispose $ x + + disposeBlocking (SomeDisposable x) = disposeBlocking x + disposeBlocking x = runAsyncIO . dispose $ x + + toDisposable = id + +instance Semigroup Disposable where + MultiDisposable x <> MultiDisposable y = MultiDisposable (x <> y) + x <> MultiDisposable y = MultiDisposable (x : y) + x <> y = MultiDisposable [x, y] + +instance Monoid Disposable where + mempty = DummyDisposable + mconcat = MultiDisposable + + +mkDisposable :: AsyncIO () -> Disposable +mkDisposable = Disposable + +synchronousDisposable :: IO () -> Disposable +synchronousDisposable = mkDisposable . liftIO + +dummyDisposable :: Disposable +dummyDisposable = mempty diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index e0b3b9151b8ca313bd042e3bee3fddda4f9f2b78..73ff419da6ce29332e928f2ee7b22fb061b338c5 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -3,12 +3,11 @@ module Quasar.Observable ( Observable(..), IsGettable(..), + getBlocking, IsObservable(..), - unsafeGetValue, + unsafeGetBlocking, subscribe', IsSettable(..), - Disposable(..), - IsDisposable(..), ObservableCallback, ObservableMessage, MessageReason(..), @@ -27,34 +26,18 @@ module Quasar.Observable ( mergeObservableMaybe, constObservable, FnObservable(..), - waitFor, - waitFor', ) where import Control.Concurrent.MVar -import Control.Exception (Exception) import Control.Monad.Except import Control.Monad.Trans.Maybe import Data.Binary (Binary) import qualified Data.HashMap.Strict as HM import Data.IORef import Data.Unique +import Quasar.Core import Quasar.Prelude -waitFor :: forall a. ((a -> IO ()) -> IO ()) -> IO a -waitFor action = do - mvar <- newEmptyMVar - action (callback mvar) - readMVar mvar - where - callback :: MVar a -> a -> IO () - callback mvar result = do - success <- tryPutMVar mvar result - unless success $ fail "Callback was called multiple times" - -waitFor' :: (IO () -> IO ()) -> IO () -waitFor' action = waitFor $ \callback -> action (callback ()) - data MessageReason = Current | Update deriving (Eq, Show, Generic) @@ -62,78 +45,29 @@ instance Binary MessageReason type ObservableMessage v = (MessageReason, v) -mapObservableMessage :: Monad m => (a -> m b) -> ObservableMessage a -> m (ObservableMessage b) -mapObservableMessage f (r, s) = (r, ) <$> f s - -data Disposable - = forall a. IsDisposable a => SomeDisposable a - | FunctionDisposable (IO () -> IO ()) - | MultiDisposable [Disposable] - | DummyDisposable - -instance IsDisposable Disposable where - dispose (SomeDisposable x) = dispose x - dispose (FunctionDisposable fn) = fn (pure ()) - dispose d@(MultiDisposable _) = waitFor' $ dispose' d - dispose DummyDisposable = pure () - dispose_ (SomeDisposable x) = dispose_ x - dispose_ (FunctionDisposable fn) = fn (pure ()) - dispose_ d@(MultiDisposable _) = waitFor' $ dispose' d - dispose_ DummyDisposable = pure () - dispose' (SomeDisposable x) = dispose' x - dispose' (FunctionDisposable fn) = fn - dispose' (MultiDisposable xs) = \disposeCallback -> do - mvars <- mapM startDispose xs - forM_ mvars $ readMVar - disposeCallback - where - startDispose :: Disposable -> IO (MVar ()) - startDispose disposable = do - mvar <- newEmptyMVar - dispose' disposable (callback mvar) - pure (mvar :: MVar ()) - callback :: MVar () -> IO () - callback mvar = do - success <- tryPutMVar mvar () - unless success $ fail "Callback was called multiple times" - dispose' DummyDisposable = id - -class IsDisposable a where - -- | Dispose a resource. Blocks until the resource is released. - dispose :: a -> IO () - dispose = waitFor' . dispose' - -- | Dispose a resource. Returns without waiting for the resource to be released. - dispose_ :: a -> IO () - dispose_ disposable = dispose' disposable (pure ()) - -- | Dispose a resource. When the resource has been released the callback is invoked. - dispose' :: a -> IO () -> IO () -instance IsDisposable a => IsDisposable (Maybe a) where - dispose' (Just disposable) callback = dispose' disposable callback - dispose' Nothing callback = callback +mapObservableMessage :: (a -> b) -> ObservableMessage a -> ObservableMessage b +mapObservableMessage f (reason, x) = (reason, f x) class IsGettable v a | a -> v where - getValue :: a -> IO v - getValue = waitFor . getValue' - getValue' :: a -> (v -> IO ()) -> IO () - getValue' gettable callback = getValue gettable >>= callback - {-# MINIMAL getValue | getValue' #-} + getValue :: a -> AsyncIO v + +getBlocking :: IsGettable v a => a -> IO v +getBlocking = runAsyncIO . getValue + class IsGettable v o => IsObservable v o | o -> v where subscribe :: o -> (ObservableMessage v -> IO ()) -> IO Disposable + toObservable :: o -> Observable v toObservable = Observable - mapObservable :: (v -> a) -> o -> Observable a - mapObservable f = mapObservableM (pure . f) - mapObservableM :: (v -> IO a) -> o -> Observable a - mapObservableM f = Observable . MappedObservable f -instance IsGettable a ((a -> IO ()) -> IO ()) where - getValue' = id + mapObservable :: (v -> a) -> o -> Observable a + mapObservable f = Observable . MappedObservable f --- | Variant of `getValue` that throws exceptions instead of returning them. -unsafeGetValue :: (Exception e, IsObservable (Either e v) o) => o -> IO v -unsafeGetValue = either throwIO pure <=< getValue +-- | Variant of `getBlocking` that throws exceptions instead of returning them. +unsafeGetBlocking :: (Exception e, IsObservable (Either e v) o) => o -> IO v +unsafeGetBlocking = either throwIO pure <=< getBlocking -- | A variant of `subscribe` that passes the `Disposable` to the callback. subscribe' :: IsObservable v o => o -> (Disposable -> ObservableMessage v -> IO ()) -> IO Disposable @@ -143,8 +77,9 @@ type ObservableCallback v = ObservableMessage v -> IO () instance IsGettable v o => IsGettable v (IO o) where - getValue :: IO o -> IO v - getValue getGettable = getValue =<< getGettable + getValue :: IO o -> AsyncIO v + getValue = getValue <=< liftIO + instance IsObservable v o => IsObservable v (IO o) where subscribe :: IO o -> (ObservableMessage v -> IO ()) -> IO Disposable subscribe getObservable callback = do @@ -164,7 +99,6 @@ instance IsObservable v (Observable v) where subscribe (Observable o) = subscribe o toObservable = id mapObservable f (Observable o) = mapObservable f o - mapObservableM f (Observable o) = mapObservableM f o instance Functor Observable where fmap f = mapObservable f @@ -178,17 +112,17 @@ instance Monad Observable where (>>=) = bindObservable -data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> IO b) o +data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o instance IsGettable v (MappedObservable v) where - getValue (MappedObservable f observable) = f =<< getValue observable + getValue (MappedObservable f observable) = f <$> getValue observable instance IsObservable v (MappedObservable v) where - subscribe (MappedObservable f observable) callback = subscribe observable (callback <=< mapObservableMessage f) - mapObservableM f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 <=< f2) upstream + subscribe (MappedObservable f observable) callback = subscribe observable (callback . mapObservableMessage f) + mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v))) instance IsGettable v (ObservableVar v) where - getValue (ObservableVar mvar) = fst <$> readMVar mvar + getValue (ObservableVar mvar) = liftIO $ fst <$> readMVar mvar instance IsObservable v (ObservableVar v) where subscribe (ObservableVar mvar) callback = do key <- newUnique @@ -196,12 +130,10 @@ instance IsObservable v (ObservableVar v) where -- Call listener callback (Current, state) pure (state, HM.insert key callback subscribers) - pure $ FunctionDisposable (disposeFn key) + pure $ synchronousDisposable (disposeFn key) where - disposeFn :: Unique -> IO () -> IO () - disposeFn key disposeCallback = do - modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers)) - disposeCallback + disposeFn :: Unique -> IO () + disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers)) instance IsSettable v (ObservableVar v) where setValue (ObservableVar mvar) value = modifyMVar_ mvar $ \(_, subscribers) -> do @@ -239,22 +171,27 @@ bindObservable x fy = joinObservable $ mapObservable fy x newtype JoinedObservable o = JoinedObservable o instance forall o i v. (IsGettable i o, IsGettable v i) => IsGettable v (JoinedObservable o) where - getValue :: JoinedObservable o -> IO v + getValue :: JoinedObservable o -> AsyncIO v getValue (JoinedObservable outer) = getValue =<< getValue outer instance forall o i v. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where subscribe :: (JoinedObservable o) -> (ObservableMessage v -> IO ()) -> IO Disposable subscribe (JoinedObservable outer) callback = do - innerSubscriptionMVar <- newMVar DummyDisposable - outerSubscription <- subscribe outer (outerCallback innerSubscriptionMVar) - pure $ FunctionDisposable (\disposeCallback -> dispose' outerSubscription (readMVar innerSubscriptionMVar >>= \innerSubscription -> dispose' innerSubscription disposeCallback)) + -- TODO: rewrite with latest semantics + -- the current implementation blocks the callback while `dispose` is running + innerDisposableMVar <- newMVar dummyDisposable + outerDisposable <- subscribe outer (outerCallback innerDisposableMVar) + pure $ mkDisposable $ do + dispose outerDisposable + dispose =<< liftIO (readMVar innerDisposableMVar) where - outerCallback innerSubscriptionMVar = outerCallback' - where - outerCallback' (_reason, innerObservable) = do - oldInnerSubscription <- takeMVar innerSubscriptionMVar - dispose' oldInnerSubscription $ do - newInnerSubscription <- subscribe innerObservable callback - putMVar innerSubscriptionMVar newInnerSubscription + outerCallback :: MVar Disposable -> ObservableMessage i -> IO () + outerCallback innerDisposableMVar (_reason, innerObservable) = do + oldInnerSubscription <- takeMVar innerDisposableMVar + void $ startAsyncIO $ do + dispose oldInnerSubscription + liftIO $ do + newInnerSubscription <- subscribe innerObservable callback + putMVar innerDisposableMVar newInnerSubscription joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v joinObservable = Observable . JoinedObservable @@ -276,16 +213,13 @@ joinObservableEither' = runExceptT . join . fmap (ExceptT . toObservable) . Exce data MergedObservable o0 v0 o1 v1 r = MergedObservable (v0 -> v1 -> r) o0 o1 instance forall o0 v0 o1 v1 r. (IsGettable v0 o0, IsGettable v1 o1) => IsGettable r (MergedObservable o0 v0 o1 v1 r) where - getValue (MergedObservable merge obs0 obs1) = do - x0 <- getValue obs0 - x1 <- getValue obs1 - pure $ merge x0 x1 + getValue (MergedObservable merge obs0 obs1) = merge <$> getValue obs0 <*> getValue obs1 instance forall o0 v0 o1 v1 r. (IsObservable v0 o0, IsObservable v1 o1) => IsObservable r (MergedObservable o0 v0 o1 v1 r) where subscribe (MergedObservable merge obs0 obs1) callback = do currentValuesTupleRef <- newIORef (Nothing, Nothing) sub0 <- subscribe obs0 (mergeCallback currentValuesTupleRef . fmap Left) sub1 <- subscribe obs1 (mergeCallback currentValuesTupleRef . fmap Right) - pure $ MultiDisposable [sub0, sub1] + pure $ mconcat [sub0, sub1] where mergeCallback :: IORef (Maybe v0, Maybe v1) -> (MessageReason, Either v0 v1) -> IO () mergeCallback currentValuesTupleRef (reason, state) = do @@ -311,16 +245,16 @@ mergeObservableMaybe merge x y = Observable $ MergedObservable (liftA2 merge) x -- | Data type that can be used as an implementation for the `IsObservable` interface that works by directly providing functions for `getValue` and `subscribe`. data FnObservable v = FnObservable { - getValueFn :: IO v, + getValueFn :: AsyncIO v, subscribeFn :: (ObservableMessage v -> IO ()) -> IO Disposable } instance IsGettable v (FnObservable v) where getValue o = getValueFn o instance IsObservable v (FnObservable v) where subscribe o = subscribeFn o - mapObservableM f FnObservable{getValueFn, subscribeFn} = Observable $ FnObservable { - getValueFn = getValueFn >>= f, - subscribeFn = \listener -> subscribeFn (mapObservableMessage f >=> listener) + mapObservable f FnObservable{getValueFn, subscribeFn} = Observable $ FnObservable { + getValueFn = f <$> getValueFn, + subscribeFn = \listener -> subscribeFn (listener . mapObservableMessage f) } @@ -330,12 +264,13 @@ instance IsGettable a (ConstObservable a) where instance IsObservable a (ConstObservable a) where subscribe (ConstObservable x) callback = do callback (Current, x) - pure DummyDisposable + pure dummyDisposable + -- | Create an observable that contains a constant value. constObservable :: a -> Observable a constObservable = Observable . ConstObservable -- TODO implement -_cacheObservable :: IsObservable v o => o -> Observable v -_cacheObservable = Observable +--cacheObservable :: IsObservable v o => o -> Observable v +--cacheObservable = undefined diff --git a/src/Quasar/Observable/Delta.hs b/src/Quasar/Observable/Delta.hs index 3bdd0b633f704718c34aa926017f34ca1b698e65..93c2bc27a841cfef6c7b7f09d282dfbdc9a8efe9 100644 --- a/src/Quasar/Observable/Delta.hs +++ b/src/Quasar/Observable/Delta.hs @@ -1,11 +1,13 @@ module Quasar.Observable.Delta where +import Quasar.Core import Quasar.Observable import Quasar.Prelude --import Conduit import qualified Data.HashMap.Strict as HM -import Data.Binary (Binary(..)) +import Data.Binary (Binary) +import qualified Data.Binary as B import Data.IORef import Data.Word (Word8) @@ -17,21 +19,18 @@ instance Functor (Delta k) where fmap _ (Delete key) = Delete key instance (Eq k, Hashable k, Binary k, Binary v) => Binary (Delta k v) where get = do - (tag :: Word8) <- get + (tag :: Word8) <- B.get case tag of - 0 -> Reset . HM.fromList <$> get - 1 -> Insert <$> get <*> get - 2 -> Delete <$> get + 0 -> Reset . HM.fromList <$> B.get + 1 -> Insert <$> B.get <*> B.get + 2 -> Delete <$> B.get _ -> fail "Invalid tag" - put (Reset hashmap) = put (0 :: Word8) >> put (HM.toList hashmap) - put (Insert key value) = put (1 :: Word8) >> put key >> put value - put (Delete key) = put (2 :: Word8) >> put key + put (Reset hashmap) = B.put (0 :: Word8) >> B.put (HM.toList hashmap) + put (Insert key value) = B.put (1 :: Word8) >> B.put key >> B.put value + put (Delete key) = B.put (2 :: Word8) >> B.put key class IsObservable (HM.HashMap k v) o => IsDeltaObservable k v o | o -> k, o -> v where subscribeDelta :: o -> (Delta k v -> IO ()) -> IO Disposable - --subscribeDeltaC :: o -> ConduitT () (Delta k v) IO () - --subscribeDeltaC = undefined - --{-# MINIMAL subscribeDelta | subscribeDeltaC #-} observeHashMapDefaultImpl :: forall k v o. (Eq k, Hashable k) => IsDeltaObservable k v o => o -> (HM.HashMap k v -> IO ()) -> IO Disposable observeHashMapDefaultImpl o callback = do @@ -39,7 +38,7 @@ observeHashMapDefaultImpl o callback = do subscribeDelta o (deltaCallback hashMapRef) where deltaCallback :: IORef (HM.HashMap k v) -> Delta k v -> IO () - deltaCallback hashMapRef delta = callback =<< atomicModifyIORef' hashMapRef ((\x -> (x, x)) . (applyDelta delta)) + deltaCallback hashMapRef delta = callback =<< atomicModifyIORef' hashMapRef ((\x -> (x, x)) . applyDelta delta) applyDelta :: Delta k v -> HM.HashMap k v -> HM.HashMap k v applyDelta (Reset state) = const state applyDelta (Insert key value) = HM.insert key value @@ -59,7 +58,7 @@ instance Functor (DeltaObservable k) where data MappedDeltaObservable k b = forall a o. IsDeltaObservable k a o => MappedDeltaObservable (a -> b) o instance IsGettable (HM.HashMap k b) (MappedDeltaObservable k b) where - getValue (MappedDeltaObservable f o) = fmap f <$> getValue o + getValue (MappedDeltaObservable f o) = f <<$>> getValue o instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where subscribe (MappedDeltaObservable f o) callback = subscribe o (callback . fmap (fmap f)) instance IsDeltaObservable k b (MappedDeltaObservable k b) where diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index a0f3a7fb7e90a9c8cb2cbc698ca577fe26726b04..e0673e7b7017f5a6756b77594a02f4b64c969790 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -10,15 +10,14 @@ module Quasar.Observable.ObservableHashMap ( lookupDelete, ) where +import Quasar.Core import Quasar.Observable import Quasar.Observable.Delta import Quasar.Prelude hiding (lookup, lookupDelete) import Quasar.Utils.ExtraT -import Control.Concurrent.MVar import qualified Data.HashMap.Strict as HM import Data.Maybe (isJust) -import Data.Unique import Language.Haskell.TH.Syntax (mkName, nameBase) import Lens.Micro.Platform @@ -32,14 +31,14 @@ data Handle k v = Handle { data KeyHandle v = KeyHandle { value :: Maybe v, - keySubscribers :: (HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ())) + keySubscribers :: HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) } makeLensesWith (lensField .~ (\_ _ -> pure . TopName . mkName . ("_" <>) . nameBase) $ lensRules) ''Handle makeLensesWith (lensField .~ (\_ _ -> pure . TopName . mkName . ("_" <>) . nameBase) $ lensRules) ''KeyHandle instance IsGettable (HM.HashMap k v) (ObservableHashMap k v) where - getValue (ObservableHashMap mvar) = HM.mapMaybe value . keyHandles <$> readMVar mvar + getValue (ObservableHashMap mvar) = liftIO $ HM.mapMaybe value . keyHandles <$> readMVar mvar instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where subscribe ohm callback = modifyHandle update ohm where @@ -48,11 +47,9 @@ instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where callback (Current, toHashMap handle) unique <- newUnique let handle' = handle & set (_subscribers . at unique) (Just callback) - pure (handle', FunctionDisposable $ unsubscribe unique) - unsubscribe :: Unique -> IO () -> IO () - unsubscribe unique unsubscribedCallback = do - modifyHandle_ (pure . set (_subscribers . at unique) Nothing) ohm - unsubscribedCallback + pure (handle', synchronousDisposable (unsubscribe unique)) + unsubscribe :: Unique -> IO () + unsubscribe unique = modifyHandle_ (pure . set (_subscribers . at unique) Nothing) ohm instance IsDeltaObservable k v (ObservableHashMap k v) where subscribeDelta ohm callback = modifyHandle update ohm @@ -62,11 +59,9 @@ instance IsDeltaObservable k v (ObservableHashMap k v) where callback (Reset $ toHashMap handle) unique <- newUnique let handle' = handle & set (_deltaSubscribers . at unique) (Just callback) - pure (handle', FunctionDisposable $ unsubscribe unique) - unsubscribe :: Unique -> IO () -> IO () - unsubscribe unique unsubscribedCallback = do - modifyHandle_ (pure . set (_deltaSubscribers . at unique) Nothing) ohm - unsubscribedCallback + pure (handle', synchronousDisposable (unsubscribe unique)) + unsubscribe :: Unique -> IO () + unsubscribe unique = modifyHandle_ (pure . set (_deltaSubscribers . at unique) Nothing) ohm toHashMap :: Handle k v -> HM.HashMap k v @@ -114,7 +109,7 @@ notifySubscribers handle@Handle{deltaSubscribers, subscribers} (Just delta) = do mapM_ ($ (Update, toHashMap handle)) $ HM.elems subscribers modifyKeySubscribers :: (HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) -> HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ())) -> KeyHandle v -> KeyHandle v -modifyKeySubscribers f = over _keySubscribers f +modifyKeySubscribers = over _keySubscribers new :: IO (ObservableHashMap k v) new = ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty} @@ -122,22 +117,20 @@ new = ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.e observeKey :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> Observable (Maybe v) observeKey key ohm@(ObservableHashMap mvar) = Observable FnObservable{getValueFn, subscribeFn} where - getValueFn :: IO (Maybe v) - getValueFn = join . preview (_keyHandles . at key . _Just . _value) <$> readMVar mvar + getValueFn :: AsyncIO (Maybe v) + getValueFn = liftIO $ join . preview (_keyHandles . at key . _Just . _value) <$> readMVar mvar subscribeFn :: ((ObservableMessage (Maybe v) -> IO ()) -> IO Disposable) subscribeFn callback = do subscriptionKey <- newUnique modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm - pure $ FunctionDisposable $ unsubscribe subscriptionKey + pure $ synchronousDisposable (unsubscribe subscriptionKey) where subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v) subscribeFn' subKey keyHandle@KeyHandle{value} = do callback (Current, value) pure $ modifyKeySubscribers (HM.insert subKey callback) keyHandle - unsubscribe :: Unique -> IO () -> IO () - unsubscribe subKey unsubscribedCallback = do - modifyKeyHandle_ (pure . modifyKeySubscribers (HM.delete subKey)) key ohm - unsubscribedCallback + unsubscribe :: Unique -> IO () + unsubscribe subKey = modifyKeyHandle_ (pure . modifyKeySubscribers (HM.delete subKey)) key ohm insert :: forall k v. (Eq k, Hashable k) => k -> v -> ObservableHashMap k v -> IO () insert key value = modifyKeyHandleNotifying_ fn key diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index d7f4415734b51f3b033410bdfbef30b00dff8ebc..86a23d822331f1eddc9cda40575b0f15baf2c698 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -4,24 +4,23 @@ module Quasar.Observable.ObservablePriority ( insertValue, ) where +import Quasar.Core import Quasar.Observable import Quasar.Prelude -import Control.Concurrent.MVar import qualified Data.HashMap.Strict as HM import Data.List (maximumBy) import Data.List.NonEmpty (NonEmpty(..), nonEmpty) import qualified Data.List.NonEmpty as NonEmpty import Data.Ord (comparing) -import Data.Unique type Entry v = (Unique, v) -- | Mutable data structure that stores values of type "v" with an assiciated priority "p". The `IsObservable` instance can be used to get or observe the value with the highest priority. -data ObservablePriority p v = ObservablePriority (MVar (Internals p v)) +newtype ObservablePriority p v = ObservablePriority (MVar (Internals p v)) instance IsGettable (Maybe v) (ObservablePriority p v) where - getValue (ObservablePriority mvar) = getValueFromInternals <$> readMVar mvar + getValue (ObservablePriority mvar) = liftIO $ getValueFromInternals <$> readMVar mvar where getValueFromInternals :: Internals p v -> Maybe v getValueFromInternals Internals{current=Nothing} = Nothing @@ -33,12 +32,10 @@ instance IsObservable (Maybe v) (ObservablePriority p v) where -- Call listener callback (Current, currentValue internals) pure internals{subscribers = HM.insert key callback subscribers} - pure $ FunctionDisposable (unsubscribe key) + pure $ synchronousDisposable (unsubscribe key) where - unsubscribe :: Unique -> IO () -> IO () - unsubscribe key disposeCallback = do - modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers} - disposeCallback + unsubscribe :: Unique -> IO () + unsubscribe key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers} type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v)) @@ -65,7 +62,7 @@ insertValue :: forall p v. (Ord p, Hashable p) => ObservablePriority p v -> p -> insertValue (ObservablePriority mvar) priority value = modifyMVar mvar $ \internals -> do key <- newUnique newInternals <- insertValue' key internals - pure (newInternals, FunctionDisposable (\callback -> removeValue key >> callback)) + pure (newInternals, synchronousDisposable (removeValue key)) where insertValue' :: Unique -> Internals p v -> IO (Internals p v) insertValue' key internals@Internals{priorityMap, current} diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index c6c7781c97ad587f37001c215f061b2b7b747d77..7cdbd02391042bea173715726f4fdd2978b2b8a4 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -1,5 +1,6 @@ module Quasar.Observable.ObservableHashMapSpec where +import Quasar.Core import Quasar.Observable import Quasar.Observable.Delta import qualified Quasar.Observable.ObservableHashMap as OM @@ -12,15 +13,15 @@ import Test.Hspec spec :: Spec spec = parallel $ do - describe "getValue" $ do + describe "getBlocking" $ do it "returns the contents of the map" $ do om <- OM.new :: IO (OM.ObservableHashMap String String) - getValue om `shouldReturn` HM.empty + getBlocking om `shouldReturn` HM.empty -- Evaluate unit for coverage () <- OM.insert "key" "value" om - getValue om `shouldReturn` HM.singleton "key" "value" + getBlocking om `shouldReturn` HM.singleton "key" "value" OM.insert "key2" "value2" om - getValue om `shouldReturn` HM.fromList [("key", "value"), ("key2", "value2")] + getBlocking om `shouldReturn` HM.fromList [("key", "value"), ("key2", "value2")] describe "subscribe" $ do it "calls the callback with the contents of the map" $ do @@ -36,7 +37,7 @@ spec = parallel $ do OM.insert "key2" "value2" om lastCallbackShouldBe (Update, HM.fromList [("key", "value"), ("key2", "value2")]) - dispose subscriptionHandle + disposeBlocking subscriptionHandle lastCallbackShouldBe (Update, HM.fromList [("key", "value"), ("key2", "value2")]) OM.insert "key3" "value3" om @@ -58,7 +59,7 @@ spec = parallel $ do OM.insert "key2" "value2" om lastDeltaShouldBe $ Insert "key2" "value2" - dispose subscriptionHandle + disposeBlocking subscriptionHandle lastDeltaShouldBe $ Insert "key2" "value2" OM.insert "key3" "value3" om @@ -73,7 +74,7 @@ spec = parallel $ do OM.lookupDelete "key" om `shouldReturn` Just "changed" lastDeltaShouldBe $ Delete "key" - getValue om `shouldReturn` HM.singleton "key3" "value3" + getBlocking om `shouldReturn` HM.singleton "key3" "value3" describe "observeKey" $ do it "calls key callbacks with the correct value" $ do @@ -112,9 +113,8 @@ spec = parallel $ do v1ShouldBe $ (Update, Nothing) v2ShouldBe $ (Update, Just "changed") - getValue om `shouldReturn` HM.singleton "key2" "changed" - -- Evaluate unit for coverage - () <- dispose handle2 + getBlocking om `shouldReturn` HM.singleton "key2" "changed" + disposeBlocking handle2 OM.lookupDelete "key2" om `shouldReturn` (Just "changed") v2ShouldBe $ (Update, Just "changed") @@ -124,4 +124,4 @@ spec = parallel $ do OM.lookupDelete "key1" om `shouldReturn` Nothing v1ShouldBe $ (Update, Nothing) - getValue om `shouldReturn` HM.empty + getBlocking om `shouldReturn` HM.empty diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index ee10ddf62c00d3172e72b910ed0fd41039ebfc00..8aa81cc94760d2e7902ca6bb91380bd620eeaab9 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -1,5 +1,6 @@ module Quasar.Observable.ObservablePrioritySpec where +import Quasar.Core import Quasar.Observable import Quasar.Observable.ObservablePriority (ObservablePriority) import qualified Quasar.Observable.ObservablePriority as OP @@ -15,31 +16,31 @@ spec = do describe "ObservablePriority" $ parallel $ do it "can be created" $ do void $ OP.create - specify "getValue returns the value with the highest priority" $ do + specify "getBlocking returns the value with the highest priority" $ do (op :: ObservablePriority Int String) <- OP.create p2 <- OP.insertValue op 2 "p2" - getValue op `shouldReturn` (Just "p2") + getBlocking op `shouldReturn` Just "p2" p1 <- OP.insertValue op 1 "p1" - getValue op `shouldReturn` (Just "p2") - dispose p2 - getValue op `shouldReturn` (Just "p1") - dispose p1 - getValue op `shouldReturn` (Nothing) + getBlocking op `shouldReturn` Just "p2" + disposeBlocking p2 + getBlocking op `shouldReturn` Just "p1" + disposeBlocking p1 + getBlocking op `shouldReturn` Nothing it "sends updates when its value changes" $ do result <- newIORef [] let mostRecentShouldBe = (head <$> readIORef result `shouldReturn`) (op :: ObservablePriority Int String) <- OP.create _s <- subscribe op (modifyIORef result . (:)) - readIORef result `shouldReturn` ([(Current, Nothing)]) + readIORef result `shouldReturn` [(Current, Nothing)] p2 <- OP.insertValue op 2 "p2" mostRecentShouldBe (Update, Just "p2") p1 <- OP.insertValue op 1 "p1" mostRecentShouldBe (Update, Just "p2") - dispose p2 + disposeBlocking p2 mostRecentShouldBe (Update, Just "p1") - dispose p1 + disposeBlocking p1 mostRecentShouldBe (Update, Nothing) length <$> readIORef result `shouldReturn` 4 diff --git a/test/Quasar/ObservableSpec.hs b/test/Quasar/ObservableSpec.hs index f3a57dfe53724320dbfa1d52849c42c1f00ab1fe..d60ddd176902166bd5e63df822d07263273233e9 100644 --- a/test/Quasar/ObservableSpec.hs +++ b/test/Quasar/ObservableSpec.hs @@ -15,12 +15,12 @@ spec = do mergeObservableSpec :: Spec mergeObservableSpec = do describe "mergeObservable" $ parallel $ do - it "merges correctly using getValue" $ do + it "merges correctly using getBlocking" $ do a <- newObservableVar "" b <- newObservableVar "" - let mergedObservable = mergeObservable (\v0 v1 -> (v0, v1)) a b - let latestShouldBe = (getValue mergedObservable `shouldReturn`) + let mergedObservable = mergeObservable (,) a b + let latestShouldBe = (getBlocking mergedObservable `shouldReturn`) testSequence a b latestShouldBe