diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 4a7d52555aae0a97a10af7e67dee70cdeb0d48e8..4bebb340f94968c2dffcde343019ef9cd3d96d7b 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -7,6 +7,7 @@ module Quasar.Observable ( IsObservable(..), Observable(..), ObservableMessage(..), + asyncObserve, -- * ObservableVar ObservableVar, @@ -70,8 +71,38 @@ class IsRetrievable v a | a -> v where retrieveIO :: IsRetrievable v a => a -> IO v retrieveIO x = withOnResourceManager $ await =<< retrieve x +{-# DEPRECATED unsafeAsyncObserveIO "Old implementation of `observe`." #-} class IsRetrievable v o => IsObservable v o | o -> v where - observe :: o -> (ObservableMessage v -> IO ()) -> IO Disposable + observe :: (MonadAwait m, MonadCatch m, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a + observe observable callback = do + msgVar <- liftIO $ newTVarIO ObservableLoading + idVar <- liftIO $ newTVarIO (0 :: Word64) + calledIdVar <- liftIO $ newTVarIO (0 :: Word64) + + bracketOnError + do + liftIO $ unsafeAsyncObserveIO observable \msg -> do + currentMessage <- atomically do + writeTVar msgVar msg + stateTVar idVar (dup . (+ 1)) + -- Wait for `callback` to complete + atomically do + readTVar calledIdVar >>= \called -> + unless (called >= currentMessage) retry + do awaitDispose + do + const $ forever do + (msgId, msg) <- liftIO $ atomically $ liftA2 (,) (readTVar idVar) (readTVar msgVar) + callback msg + liftIO $ atomically $ writeTVar calledIdVar msgId + + unsafeAsyncObserveIO :: o -> (ObservableMessage v -> IO ()) -> IO Disposable + unsafeAsyncObserveIO observable callback = do + resourceManager <- unsafeNewResourceManager + onResourceManager resourceManager do + asyncObserve observable (liftIO . callback) + + pure (toDisposable resourceManager) toObservable :: o -> Observable v toObservable = Observable @@ -79,6 +110,13 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f + {-# MINIMAL observe | unsafeAsyncObserveIO #-} + + +asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> m ()) -> m Disposable +asyncObserve observable callback = toDisposable <$> async (observe observable callback) + + -- | (TODO) Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. observeWhile :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable observeWhile observable callback = do @@ -120,6 +158,7 @@ instance IsRetrievable v (Observable v) where retrieve (Observable o) = retrieve o instance IsObservable v (Observable v) where observe (Observable o) = observe o + unsafeAsyncObserveIO (Observable o) = unsafeAsyncObserveIO o toObservable = id mapObservable f (Observable o) = mapObservable f o @@ -156,6 +195,7 @@ instance IsRetrievable v (MappedObservable v) where retrieve (MappedObservable f observable) = f <<$>> retrieve observable instance IsObservable v (MappedObservable v) where observe (MappedObservable f observable) callback = observe observable (callback . fmap f) + unsafeAsyncObserveIO (MappedObservable f observable) callback = unsafeAsyncObserveIO observable (callback . fmap f) mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream @@ -168,8 +208,8 @@ instance IsRetrievable r (BindObservable r) where awaitResult $ retrieve $ fn x instance IsObservable r (BindObservable r) where - observe :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable - observe (BindObservable fx fn) callback = do + unsafeAsyncObserveIO :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable + unsafeAsyncObserveIO (BindObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. resourceManager <- unsafeNewResourceManager @@ -177,7 +217,7 @@ instance IsObservable r (BindObservable r) where disposableVar <- newTMVarIO noDisposable keyVar <- newTMVarIO Nothing - leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + leftDisposable <- unsafeAsyncObserveIO fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) attachDisposeAction_ resourceManager $ do atomically $ writeTVar isDisposingVar True @@ -214,7 +254,7 @@ instance IsObservable r (BindObservable r) where True -> pure $ pure () where - outerMessageHandler key (ObservableUpdate x) = observe (fn x) (innerCallback key) + outerMessageHandler key (ObservableUpdate x) = unsafeAsyncObserveIO (fn x) (innerCallback key) outerMessageHandler _ ObservableLoading = noDisposable <$ callback ObservableLoading outerMessageHandler _ (ObservableNotAvailable ex) = noDisposable <$ callback (ObservableNotAvailable ex) @@ -236,8 +276,8 @@ instance IsRetrievable r (CatchObservable e r) where awaitResult (retrieve fx) `catch` \ex -> awaitResult (retrieve (fn ex)) instance IsObservable r (CatchObservable e r) where - observe :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable - observe (CatchObservable fx fn) callback = do + unsafeAsyncObserveIO :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable + unsafeAsyncObserveIO (CatchObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. resourceManager <- unsafeNewResourceManager @@ -245,7 +285,7 @@ instance IsObservable r (CatchObservable e r) where disposableVar <- newTMVarIO noDisposable keyVar <- newTMVarIO Nothing - leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + leftDisposable <- unsafeAsyncObserveIO fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) attachDisposeAction_ resourceManager $ do atomically $ writeTVar isDisposingVar True @@ -282,7 +322,7 @@ instance IsObservable r (CatchObservable e r) where True -> pure $ pure () where - outerMessageHandler key (ObservableNotAvailable (fromException -> Just ex)) = observe (fn ex) (innerCallback key) + outerMessageHandler key (ObservableNotAvailable (fromException -> Just ex)) = unsafeAsyncObserveIO (fn ex) (innerCallback key) outerMessageHandler _ msg = noDisposable <$ callback msg innerCallback :: Unique -> ObservableMessage r -> IO () @@ -301,7 +341,7 @@ newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableC instance IsRetrievable v (ObservableVar v) where retrieve (ObservableVar mvar) = liftIO $ successfulTask . fst <$> readMVar mvar instance IsObservable v (ObservableVar v) where - observe (ObservableVar mvar) callback = do + unsafeAsyncObserveIO (ObservableVar mvar) callback = do key <- newUnique modifyMVar_ mvar $ \(state, subscribers) -> do -- Call listener @@ -356,11 +396,11 @@ data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1 instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1) instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObservable r (MergedObservable r o0 v0 o1 v1) where - observe (MergedObservable merge obs0 obs1) callback = do + unsafeAsyncObserveIO (MergedObservable merge obs0 obs1) callback = do var0 <- newTVarIO Nothing var1 <- newTVarIO Nothing - d0 <- observe obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) - d1 <- observe obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) + d0 <- unsafeAsyncObserveIO obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) + d1 <- unsafeAsyncObserveIO obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) pure $ mconcat [d0, d1] where mergeCallback :: TVar (Maybe (ObservableMessage v0)) -> TVar (Maybe (ObservableMessage v1)) -> STM () -> IO () @@ -389,7 +429,7 @@ data FnObservable v = FnObservable { instance IsRetrievable v (FnObservable v) where retrieve o = retrieveFn o instance IsObservable v (FnObservable v) where - observe o = observeFn o + unsafeAsyncObserveIO o = observeFn o mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable { retrieveFn = f <<$>> retrieveFn, observeFn = \listener -> observeFn (listener . fmap f) @@ -417,7 +457,7 @@ newtype ConstObservable v = ConstObservable v instance IsRetrievable v (ConstObservable v) where retrieve (ConstObservable x) = pure $ pure x instance IsObservable v (ConstObservable v) where - observe (ConstObservable x) callback = do + unsafeAsyncObserveIO (ConstObservable x) callback = do callback $ ObservableUpdate x pure noDisposable @@ -426,7 +466,7 @@ newtype FailedObservable v = FailedObservable SomeException instance IsRetrievable v (FailedObservable v) where retrieve (FailedObservable ex) = liftIO $ throwIO ex instance IsObservable v (FailedObservable v) where - observe (FailedObservable ex) callback = do + unsafeAsyncObserveIO (FailedObservable ex) callback = do callback $ ObservableNotAvailable ex pure noDisposable diff --git a/src/Quasar/Observable/Delta.hs b/src/Quasar/Observable/Delta.hs index 8c099fa1e93eb3a1e08c2ba34956f0062c17116b..377b195a7087f942dddef1c282553ecf4fd6f6a8 100644 --- a/src/Quasar/Observable/Delta.hs +++ b/src/Quasar/Observable/Delta.hs @@ -50,6 +50,7 @@ instance IsRetrievable (HM.HashMap k v) (DeltaObservable k v) where retrieve (DeltaObservable o) = retrieve o instance IsObservable (HM.HashMap k v) (DeltaObservable k v) where observe (DeltaObservable o) = observe o + unsafeAsyncObserveIO (DeltaObservable o) = unsafeAsyncObserveIO o instance IsDeltaObservable k v (DeltaObservable k v) where subscribeDelta (DeltaObservable o) = subscribeDelta o instance Functor (DeltaObservable k) where @@ -61,5 +62,6 @@ instance IsRetrievable (HM.HashMap k b) (MappedDeltaObservable k b) where retrieve (MappedDeltaObservable f o) = fmap f <<$>> retrieve o instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where observe (MappedDeltaObservable f o) callback = observe o (callback . fmap (fmap f)) + unsafeAsyncObserveIO (MappedDeltaObservable f o) callback = unsafeAsyncObserveIO o (callback . fmap (fmap f)) instance IsDeltaObservable k b (MappedDeltaObservable k b) where subscribeDelta (MappedDeltaObservable f o) callback = subscribeDelta o (callback . fmap f) diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index eeb9db74c73ea26c25842b77286721aab262dc86..ef9987402b3a9e778ed34a2e3506be6e99da812e 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -39,7 +39,7 @@ makeLensesWith (lensField .~ (\_ _ -> pure . TopName . mkName . ("_" <>) . nameB instance IsRetrievable (HM.HashMap k v) (ObservableHashMap k v) where retrieve (ObservableHashMap mvar) = liftIO $ pure . HM.mapMaybe value . keyHandles <$> readMVar mvar instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where - observe ohm callback = liftIO $ modifyHandle update ohm + unsafeAsyncObserveIO ohm callback = liftIO $ modifyHandle update ohm where update :: Handle k v -> IO (Handle k v, Disposable) update handle = do diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index 55acf14cf3a5e639c7b7078eda37ec6722280b7a..82c9c08f804e2606be2ee1f9f2cde7dd58fb9d6b 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -25,7 +25,7 @@ instance IsRetrievable (Maybe v) (ObservablePriority p v) where getValueFromInternals Internals{current=Nothing} = Nothing getValueFromInternals Internals{current=Just (_, _, value)} = Just value instance IsObservable (Maybe v) (ObservablePriority p v) where - observe (ObservablePriority mvar) callback = do + unsafeAsyncObserveIO (ObservablePriority mvar) callback = do key <- newUnique modifyMVar_ mvar $ \internals@Internals{subscribers} -> do -- Call listener diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index d69d1c142b8bfba4b872092f6d241f9242525bd3..85846d960582f5b1090248ccaca57bdf053e312d 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -28,7 +28,7 @@ spec = parallel $ do lastCallbackValue <- newIORef undefined om <- OM.new :: IO (OM.ObservableHashMap String String) - subscriptionHandle <- observe om $ writeIORef lastCallbackValue + subscriptionHandle <- unsafeAsyncObserveIO om $ writeIORef lastCallbackValue let lastCallbackShouldBe expected = do (ObservableUpdate update) <- readIORef lastCallbackValue update `shouldBe` expected @@ -85,7 +85,7 @@ spec = parallel $ do om <- OM.new :: IO (OM.ObservableHashMap String String) - void $ observe (OM.observeKey "key1" om) (writeIORef value1) + void $ unsafeAsyncObserveIO (OM.observeKey "key1" om) (writeIORef value1) let v1ShouldBe expected = do (ObservableUpdate update) <- readIORef value1 update `shouldBe` expected @@ -98,7 +98,7 @@ spec = parallel $ do OM.insert "key2" "value2" om v1ShouldBe $ Just "value1" - handle2 <- observe (OM.observeKey "key2" om) (writeIORef value2) + handle2 <- unsafeAsyncObserveIO (OM.observeKey "key2" om) (writeIORef value2) let v2ShouldBe expected = do (ObservableUpdate update) <- readIORef value2 update `shouldBe` expected diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index 003a0df825f2ecbf2111cbbcf17e25348bd97254..ba57c9359b59cd8db7c3e662d576f45bd52f53a4 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -33,7 +33,7 @@ spec = do x `shouldBe` expected (op :: ObservablePriority Int String) <- OP.create - _s <- observe op (modifyIORef result . (:)) + _s <- unsafeAsyncObserveIO op (modifyIORef result . (:)) mostRecentShouldBe Nothing p2 <- OP.insertValue op 2 "p2" diff --git a/test/Quasar/ObservableSpec.hs b/test/Quasar/ObservableSpec.hs index 01833938ac3e75c0d415864093ab19f8d6618718..0ec52eb42cf36728fe31f9080ce3ee2694fda9b2 100644 --- a/test/Quasar/ObservableSpec.hs +++ b/test/Quasar/ObservableSpec.hs @@ -30,7 +30,7 @@ mergeObservableSpec = do let mergedObservable = mergeObservable (,) a b (latestRef :: IORef (ObservableMessage (String, String))) <- newIORef (ObservableUpdate ("", "")) - void $ observe mergedObservable (writeIORef latestRef) + void $ unsafeAsyncObserveIO mergedObservable (writeIORef latestRef) let latestShouldBe expected = do (ObservableUpdate x) <- readIORef latestRef x `shouldBe` expected