From 0ea39eca33fb0fe66316ffbce587eefe6ab46873 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 29 Aug 2021 19:28:01 +0200 Subject: [PATCH] Change `observe` signature Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Observable.hs | 72 ++++++++++++++----- src/Quasar/Observable/Delta.hs | 2 + src/Quasar/Observable/ObservableHashMap.hs | 2 +- src/Quasar/Observable/ObservablePriority.hs | 2 +- .../Observable/ObservableHashMapSpec.hs | 6 +- .../Observable/ObservablePrioritySpec.hs | 2 +- test/Quasar/ObservableSpec.hs | 2 +- 7 files changed, 65 insertions(+), 23 deletions(-) diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 4a7d525..4bebb34 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -7,6 +7,7 @@ module Quasar.Observable ( IsObservable(..), Observable(..), ObservableMessage(..), + asyncObserve, -- * ObservableVar ObservableVar, @@ -70,8 +71,38 @@ class IsRetrievable v a | a -> v where retrieveIO :: IsRetrievable v a => a -> IO v retrieveIO x = withOnResourceManager $ await =<< retrieve x +{-# DEPRECATED unsafeAsyncObserveIO "Old implementation of `observe`." #-} class IsRetrievable v o => IsObservable v o | o -> v where - observe :: o -> (ObservableMessage v -> IO ()) -> IO Disposable + observe :: (MonadAwait m, MonadCatch m, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a + observe observable callback = do + msgVar <- liftIO $ newTVarIO ObservableLoading + idVar <- liftIO $ newTVarIO (0 :: Word64) + calledIdVar <- liftIO $ newTVarIO (0 :: Word64) + + bracketOnError + do + liftIO $ unsafeAsyncObserveIO observable \msg -> do + currentMessage <- atomically do + writeTVar msgVar msg + stateTVar idVar (dup . (+ 1)) + -- Wait for `callback` to complete + atomically do + readTVar calledIdVar >>= \called -> + unless (called >= currentMessage) retry + do awaitDispose + do + const $ forever do + (msgId, msg) <- liftIO $ atomically $ liftA2 (,) (readTVar idVar) (readTVar msgVar) + callback msg + liftIO $ atomically $ writeTVar calledIdVar msgId + + unsafeAsyncObserveIO :: o -> (ObservableMessage v -> IO ()) -> IO Disposable + unsafeAsyncObserveIO observable callback = do + resourceManager <- unsafeNewResourceManager + onResourceManager resourceManager do + asyncObserve observable (liftIO . callback) + + pure (toDisposable resourceManager) toObservable :: o -> Observable v toObservable = Observable @@ -79,6 +110,13 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f + {-# MINIMAL observe | unsafeAsyncObserveIO #-} + + +asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> m ()) -> m Disposable +asyncObserve observable callback = toDisposable <$> async (observe observable callback) + + -- | (TODO) Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. observeWhile :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable observeWhile observable callback = do @@ -120,6 +158,7 @@ instance IsRetrievable v (Observable v) where retrieve (Observable o) = retrieve o instance IsObservable v (Observable v) where observe (Observable o) = observe o + unsafeAsyncObserveIO (Observable o) = unsafeAsyncObserveIO o toObservable = id mapObservable f (Observable o) = mapObservable f o @@ -156,6 +195,7 @@ instance IsRetrievable v (MappedObservable v) where retrieve (MappedObservable f observable) = f <<$>> retrieve observable instance IsObservable v (MappedObservable v) where observe (MappedObservable f observable) callback = observe observable (callback . fmap f) + unsafeAsyncObserveIO (MappedObservable f observable) callback = unsafeAsyncObserveIO observable (callback . fmap f) mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream @@ -168,8 +208,8 @@ instance IsRetrievable r (BindObservable r) where awaitResult $ retrieve $ fn x instance IsObservable r (BindObservable r) where - observe :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable - observe (BindObservable fx fn) callback = do + unsafeAsyncObserveIO :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable + unsafeAsyncObserveIO (BindObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. resourceManager <- unsafeNewResourceManager @@ -177,7 +217,7 @@ instance IsObservable r (BindObservable r) where disposableVar <- newTMVarIO noDisposable keyVar <- newTMVarIO Nothing - leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + leftDisposable <- unsafeAsyncObserveIO fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) attachDisposeAction_ resourceManager $ do atomically $ writeTVar isDisposingVar True @@ -214,7 +254,7 @@ instance IsObservable r (BindObservable r) where True -> pure $ pure () where - outerMessageHandler key (ObservableUpdate x) = observe (fn x) (innerCallback key) + outerMessageHandler key (ObservableUpdate x) = unsafeAsyncObserveIO (fn x) (innerCallback key) outerMessageHandler _ ObservableLoading = noDisposable <$ callback ObservableLoading outerMessageHandler _ (ObservableNotAvailable ex) = noDisposable <$ callback (ObservableNotAvailable ex) @@ -236,8 +276,8 @@ instance IsRetrievable r (CatchObservable e r) where awaitResult (retrieve fx) `catch` \ex -> awaitResult (retrieve (fn ex)) instance IsObservable r (CatchObservable e r) where - observe :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable - observe (CatchObservable fx fn) callback = do + unsafeAsyncObserveIO :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable + unsafeAsyncObserveIO (CatchObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. resourceManager <- unsafeNewResourceManager @@ -245,7 +285,7 @@ instance IsObservable r (CatchObservable e r) where disposableVar <- newTMVarIO noDisposable keyVar <- newTMVarIO Nothing - leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + leftDisposable <- unsafeAsyncObserveIO fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) attachDisposeAction_ resourceManager $ do atomically $ writeTVar isDisposingVar True @@ -282,7 +322,7 @@ instance IsObservable r (CatchObservable e r) where True -> pure $ pure () where - outerMessageHandler key (ObservableNotAvailable (fromException -> Just ex)) = observe (fn ex) (innerCallback key) + outerMessageHandler key (ObservableNotAvailable (fromException -> Just ex)) = unsafeAsyncObserveIO (fn ex) (innerCallback key) outerMessageHandler _ msg = noDisposable <$ callback msg innerCallback :: Unique -> ObservableMessage r -> IO () @@ -301,7 +341,7 @@ newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableC instance IsRetrievable v (ObservableVar v) where retrieve (ObservableVar mvar) = liftIO $ successfulTask . fst <$> readMVar mvar instance IsObservable v (ObservableVar v) where - observe (ObservableVar mvar) callback = do + unsafeAsyncObserveIO (ObservableVar mvar) callback = do key <- newUnique modifyMVar_ mvar $ \(state, subscribers) -> do -- Call listener @@ -356,11 +396,11 @@ data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1 instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1) instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObservable r (MergedObservable r o0 v0 o1 v1) where - observe (MergedObservable merge obs0 obs1) callback = do + unsafeAsyncObserveIO (MergedObservable merge obs0 obs1) callback = do var0 <- newTVarIO Nothing var1 <- newTVarIO Nothing - d0 <- observe obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) - d1 <- observe obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) + d0 <- unsafeAsyncObserveIO obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) + d1 <- unsafeAsyncObserveIO obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) pure $ mconcat [d0, d1] where mergeCallback :: TVar (Maybe (ObservableMessage v0)) -> TVar (Maybe (ObservableMessage v1)) -> STM () -> IO () @@ -389,7 +429,7 @@ data FnObservable v = FnObservable { instance IsRetrievable v (FnObservable v) where retrieve o = retrieveFn o instance IsObservable v (FnObservable v) where - observe o = observeFn o + unsafeAsyncObserveIO o = observeFn o mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable { retrieveFn = f <<$>> retrieveFn, observeFn = \listener -> observeFn (listener . fmap f) @@ -417,7 +457,7 @@ newtype ConstObservable v = ConstObservable v instance IsRetrievable v (ConstObservable v) where retrieve (ConstObservable x) = pure $ pure x instance IsObservable v (ConstObservable v) where - observe (ConstObservable x) callback = do + unsafeAsyncObserveIO (ConstObservable x) callback = do callback $ ObservableUpdate x pure noDisposable @@ -426,7 +466,7 @@ newtype FailedObservable v = FailedObservable SomeException instance IsRetrievable v (FailedObservable v) where retrieve (FailedObservable ex) = liftIO $ throwIO ex instance IsObservable v (FailedObservable v) where - observe (FailedObservable ex) callback = do + unsafeAsyncObserveIO (FailedObservable ex) callback = do callback $ ObservableNotAvailable ex pure noDisposable diff --git a/src/Quasar/Observable/Delta.hs b/src/Quasar/Observable/Delta.hs index 8c099fa..377b195 100644 --- a/src/Quasar/Observable/Delta.hs +++ b/src/Quasar/Observable/Delta.hs @@ -50,6 +50,7 @@ instance IsRetrievable (HM.HashMap k v) (DeltaObservable k v) where retrieve (DeltaObservable o) = retrieve o instance IsObservable (HM.HashMap k v) (DeltaObservable k v) where observe (DeltaObservable o) = observe o + unsafeAsyncObserveIO (DeltaObservable o) = unsafeAsyncObserveIO o instance IsDeltaObservable k v (DeltaObservable k v) where subscribeDelta (DeltaObservable o) = subscribeDelta o instance Functor (DeltaObservable k) where @@ -61,5 +62,6 @@ instance IsRetrievable (HM.HashMap k b) (MappedDeltaObservable k b) where retrieve (MappedDeltaObservable f o) = fmap f <<$>> retrieve o instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where observe (MappedDeltaObservable f o) callback = observe o (callback . fmap (fmap f)) + unsafeAsyncObserveIO (MappedDeltaObservable f o) callback = unsafeAsyncObserveIO o (callback . fmap (fmap f)) instance IsDeltaObservable k b (MappedDeltaObservable k b) where subscribeDelta (MappedDeltaObservable f o) callback = subscribeDelta o (callback . fmap f) diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index eeb9db7..ef99874 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -39,7 +39,7 @@ makeLensesWith (lensField .~ (\_ _ -> pure . TopName . mkName . ("_" <>) . nameB instance IsRetrievable (HM.HashMap k v) (ObservableHashMap k v) where retrieve (ObservableHashMap mvar) = liftIO $ pure . HM.mapMaybe value . keyHandles <$> readMVar mvar instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where - observe ohm callback = liftIO $ modifyHandle update ohm + unsafeAsyncObserveIO ohm callback = liftIO $ modifyHandle update ohm where update :: Handle k v -> IO (Handle k v, Disposable) update handle = do diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index 55acf14..82c9c08 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -25,7 +25,7 @@ instance IsRetrievable (Maybe v) (ObservablePriority p v) where getValueFromInternals Internals{current=Nothing} = Nothing getValueFromInternals Internals{current=Just (_, _, value)} = Just value instance IsObservable (Maybe v) (ObservablePriority p v) where - observe (ObservablePriority mvar) callback = do + unsafeAsyncObserveIO (ObservablePriority mvar) callback = do key <- newUnique modifyMVar_ mvar $ \internals@Internals{subscribers} -> do -- Call listener diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index d69d1c1..85846d9 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -28,7 +28,7 @@ spec = parallel $ do lastCallbackValue <- newIORef undefined om <- OM.new :: IO (OM.ObservableHashMap String String) - subscriptionHandle <- observe om $ writeIORef lastCallbackValue + subscriptionHandle <- unsafeAsyncObserveIO om $ writeIORef lastCallbackValue let lastCallbackShouldBe expected = do (ObservableUpdate update) <- readIORef lastCallbackValue update `shouldBe` expected @@ -85,7 +85,7 @@ spec = parallel $ do om <- OM.new :: IO (OM.ObservableHashMap String String) - void $ observe (OM.observeKey "key1" om) (writeIORef value1) + void $ unsafeAsyncObserveIO (OM.observeKey "key1" om) (writeIORef value1) let v1ShouldBe expected = do (ObservableUpdate update) <- readIORef value1 update `shouldBe` expected @@ -98,7 +98,7 @@ spec = parallel $ do OM.insert "key2" "value2" om v1ShouldBe $ Just "value1" - handle2 <- observe (OM.observeKey "key2" om) (writeIORef value2) + handle2 <- unsafeAsyncObserveIO (OM.observeKey "key2" om) (writeIORef value2) let v2ShouldBe expected = do (ObservableUpdate update) <- readIORef value2 update `shouldBe` expected diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index 003a0df..ba57c93 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -33,7 +33,7 @@ spec = do x `shouldBe` expected (op :: ObservablePriority Int String) <- OP.create - _s <- observe op (modifyIORef result . (:)) + _s <- unsafeAsyncObserveIO op (modifyIORef result . (:)) mostRecentShouldBe Nothing p2 <- OP.insertValue op 2 "p2" diff --git a/test/Quasar/ObservableSpec.hs b/test/Quasar/ObservableSpec.hs index 0183393..0ec52eb 100644 --- a/test/Quasar/ObservableSpec.hs +++ b/test/Quasar/ObservableSpec.hs @@ -30,7 +30,7 @@ mergeObservableSpec = do let mergedObservable = mergeObservable (,) a b (latestRef :: IORef (ObservableMessage (String, String))) <- newIORef (ObservableUpdate ("", "")) - void $ observe mergedObservable (writeIORef latestRef) + void $ unsafeAsyncObserveIO mergedObservable (writeIORef latestRef) let latestShouldBe expected = do (ObservableUpdate x) <- readIORef latestRef x `shouldBe` expected -- GitLab