diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index e8faa5d6f69b28113748a845281bc11f17d483c0..c38b16d50de93fb0f9674e381b5276cb92447497 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -9,6 +9,7 @@ module Quasar.Observable ( ObservableMessage(..), toObservableUpdate, asyncObserve, + observeBlocking, -- * ObservableVar ObservableVar, @@ -79,49 +80,11 @@ class IsRetrievable v a | a -> v where retrieveIO :: IsRetrievable v a => a -> IO v retrieveIO x = withOnResourceManager $ await =<< retrieve x -{-# DEPRECATED unsafeAsyncObserveIO "Old implementation of `observe`." #-} +{-# DEPRECATED oldObserve "Old implementation of `observe`." #-} class IsRetrievable v o => IsObservable v o | o -> v where - observe :: MonadResourceManager m => o -> (ObservableMessage v -> m ()) -> m a - observe observable callback = do - msgVar <- liftIO $ newTVarIO ObservableLoading - idVar <- liftIO $ newTVarIO (0 :: Word64) - calledIdVar <- liftIO $ newTVarIO (0 :: Word64) - completedVar <- newAsyncVar - - resourceManager <- askResourceManager - finally - do - bracketOnError - do - -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design. - forkTask do - attachDisposable resourceManager =<< liftIO do - unsafeAsyncObserveIO observable \msg -> do - currentMsgId <- atomically do - writeTVar msgVar msg - stateTVar idVar (dup . (+ 1)) - -- Wait for `callback` to complete - awaitAny2 - do toAwaitable completedVar - do - unsafeAwaitSTM do - readTVar calledIdVar >>= \calledId -> - unless (calledId >= currentMsgId) retry - do disposeAndAwait - do - const $ forever do - (msgId, msg) <- liftIO $ atomically do - msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar) - unless msgAvailable retry - liftA2 (,) (readTVar idVar) (readTVar msgVar) - callback msg - liftIO $ atomically $ writeTVar calledIdVar msgId - do - putAsyncVar completedVar () - - unsafeAsyncObserveIO :: o -> (ObservableMessage v -> IO ()) -> IO Disposable - unsafeAsyncObserveIO observable callback = do - forkTask_ $ withOnResourceManager $ observe observable (liftIO . callback) + oldObserve :: o -> (ObservableMessage v -> IO ()) -> IO Disposable + oldObserve observable callback = do + forkTask_ $ withOnResourceManager $ observeBlocking observable (liftIO . callback) toObservable :: o -> Observable v toObservable = Observable @@ -129,11 +92,48 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f - {-# MINIMAL observe | unsafeAsyncObserveIO #-} + +observeBlocking :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a +observeBlocking observable callback = do + msgVar <- liftIO $ newTVarIO ObservableLoading + idVar <- liftIO $ newTVarIO (0 :: Word64) + calledIdVar <- liftIO $ newTVarIO (0 :: Word64) + completedVar <- newAsyncVar + + resourceManager <- askResourceManager + finally + do + bracketOnError + do + -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design. + forkTask do + attachDisposable resourceManager =<< liftIO do + oldObserve observable \msg -> do + currentMsgId <- atomically do + writeTVar msgVar msg + stateTVar idVar (dup . (+ 1)) + -- Wait for `callback` to complete + awaitAny2 + do toAwaitable completedVar + do + unsafeAwaitSTM do + readTVar calledIdVar >>= \calledId -> + unless (calledId >= currentMsgId) retry + do disposeAndAwait + do + const $ forever do + (msgId, msg) <- liftIO $ atomically do + msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar) + unless msgAvailable retry + liftA2 (,) (readTVar idVar) (readTVar msgVar) + callback msg + liftIO $ atomically $ writeTVar calledIdVar msgId + do + putAsyncVar completedVar () asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> m ()) -> m () -asyncObserve observable callback = async_ (observe observable callback) +asyncObserve observable callback = async_ (observeBlocking observable callback) data ObserveWhileCompleted = ObserveWhileCompleted @@ -160,7 +160,7 @@ observeWhile_ :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableM observeWhile_ observable callback = catch do - observe observable \msg -> do + observeBlocking observable \msg -> do continue <- callback msg unless continue $ throwM ObserveWhileCompleted \ObserveWhileCompleted -> pure () @@ -174,8 +174,7 @@ data Observable v = forall o. IsObservable v o => Observable o instance IsRetrievable v (Observable v) where retrieve (Observable o) = retrieve o instance IsObservable v (Observable v) where - observe (Observable o) = observe o - unsafeAsyncObserveIO (Observable o) = unsafeAsyncObserveIO o + oldObserve (Observable o) = oldObserve o toObservable = id mapObservable f (Observable o) = mapObservable f o @@ -211,8 +210,7 @@ data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> instance IsRetrievable v (MappedObservable v) where retrieve (MappedObservable f observable) = f <<$>> retrieve observable instance IsObservable v (MappedObservable v) where - observe (MappedObservable f observable) callback = observe observable (callback . fmap f) - unsafeAsyncObserveIO (MappedObservable f observable) callback = unsafeAsyncObserveIO observable (callback . fmap f) + oldObserve (MappedObservable f observable) callback = oldObserve observable (callback . fmap f) mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream @@ -225,8 +223,8 @@ instance IsRetrievable r (BindObservable r) where retrieve $ fn x instance IsObservable r (BindObservable r) where - unsafeAsyncObserveIO :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable - unsafeAsyncObserveIO (BindObservable fx fn) callback = do + oldObserve :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable + oldObserve (BindObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. resourceManager <- unsafeNewResourceManager @@ -234,7 +232,7 @@ instance IsObservable r (BindObservable r) where disposableVar <- newTMVarIO noDisposable keyVar <- newTMVarIO Nothing - leftDisposable <- unsafeAsyncObserveIO fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + leftDisposable <- oldObserve fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) attachDisposeAction_ resourceManager $ do atomically $ writeTVar isDisposingVar True @@ -272,7 +270,7 @@ instance IsObservable r (BindObservable r) where True -> pure $ pure () where - outerMessageHandler key (ObservableUpdate x) = unsafeAsyncObserveIO (fn x) (innerCallback key) + outerMessageHandler key (ObservableUpdate x) = oldObserve (fn x) (innerCallback key) outerMessageHandler _ ObservableLoading = noDisposable <$ callback ObservableLoading outerMessageHandler _ (ObservableNotAvailable ex) = noDisposable <$ callback (ObservableNotAvailable ex) @@ -293,8 +291,8 @@ instance IsRetrievable r (CatchObservable e r) where retrieve (CatchObservable fx fn) = retrieve fx `catch` \ex -> retrieve (fn ex) instance IsObservable r (CatchObservable e r) where - unsafeAsyncObserveIO :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable - unsafeAsyncObserveIO (CatchObservable fx fn) callback = do + oldObserve :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable + oldObserve (CatchObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. resourceManager <- unsafeNewResourceManager @@ -302,7 +300,7 @@ instance IsObservable r (CatchObservable e r) where disposableVar <- newTMVarIO noDisposable keyVar <- newTMVarIO Nothing - leftDisposable <- unsafeAsyncObserveIO fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) + leftDisposable <- oldObserve fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) attachDisposeAction_ resourceManager $ do atomically $ writeTVar isDisposingVar True @@ -340,7 +338,7 @@ instance IsObservable r (CatchObservable e r) where True -> pure $ pure () where - outerMessageHandler key (ObservableNotAvailable (fromException -> Just ex)) = unsafeAsyncObserveIO (fn ex) (innerCallback key) + outerMessageHandler key (ObservableNotAvailable (fromException -> Just ex)) = oldObserve (fn ex) (innerCallback key) outerMessageHandler _ msg = noDisposable <$ callback msg innerCallback :: Unique -> ObservableMessage r -> IO () @@ -359,7 +357,7 @@ newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableC instance IsRetrievable v (ObservableVar v) where retrieve (ObservableVar mvar) = liftIO $ pure . fst <$> readMVar mvar instance IsObservable v (ObservableVar v) where - unsafeAsyncObserveIO (ObservableVar mvar) callback = do + oldObserve (ObservableVar mvar) callback = do key <- newUnique modifyMVar_ mvar $ \(state, subscribers) -> do -- Call listener @@ -417,11 +415,11 @@ data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1 instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1) instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObservable r (MergedObservable r o0 v0 o1 v1) where - unsafeAsyncObserveIO (MergedObservable merge obs0 obs1) callback = do + oldObserve (MergedObservable merge obs0 obs1) callback = do var0 <- newTVarIO Nothing var1 <- newTVarIO Nothing - d0 <- unsafeAsyncObserveIO obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) - d1 <- unsafeAsyncObserveIO obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) + d0 <- oldObserve obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) + d1 <- oldObserve obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) pure $ mconcat [d0, d1] where mergeCallback :: TVar (Maybe (ObservableMessage v0)) -> TVar (Maybe (ObservableMessage v1)) -> STM () -> IO () @@ -450,7 +448,7 @@ data FnObservable v = FnObservable { instance IsRetrievable v (FnObservable v) where retrieve o = retrieveFn o instance IsObservable v (FnObservable v) where - unsafeAsyncObserveIO o = observeFn o + oldObserve o = observeFn o mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable { retrieveFn = f <<$>> retrieveFn, observeFn = \listener -> observeFn (listener . fmap f) @@ -478,7 +476,7 @@ newtype ConstObservable v = ConstObservable v instance IsRetrievable v (ConstObservable v) where retrieve (ConstObservable x) = pure $ pure x instance IsObservable v (ConstObservable v) where - unsafeAsyncObserveIO (ConstObservable x) callback = do + oldObserve (ConstObservable x) callback = do callback $ ObservableUpdate x pure noDisposable @@ -487,7 +485,7 @@ newtype FailedObservable v = FailedObservable SomeException instance IsRetrievable v (FailedObservable v) where retrieve (FailedObservable ex) = liftIO $ throwIO ex instance IsObservable v (FailedObservable v) where - unsafeAsyncObserveIO (FailedObservable ex) callback = do + oldObserve (FailedObservable ex) callback = do callback $ ObservableNotAvailable ex pure noDisposable diff --git a/src/Quasar/Observable/Delta.hs b/src/Quasar/Observable/Delta.hs index 377b195a7087f942dddef1c282553ecf4fd6f6a8..ea142c3557bfce5ab48dbd9e4de11683af511415 100644 --- a/src/Quasar/Observable/Delta.hs +++ b/src/Quasar/Observable/Delta.hs @@ -49,8 +49,7 @@ data DeltaObservable k v = forall o. IsDeltaObservable k v o => DeltaObservable instance IsRetrievable (HM.HashMap k v) (DeltaObservable k v) where retrieve (DeltaObservable o) = retrieve o instance IsObservable (HM.HashMap k v) (DeltaObservable k v) where - observe (DeltaObservable o) = observe o - unsafeAsyncObserveIO (DeltaObservable o) = unsafeAsyncObserveIO o + oldObserve (DeltaObservable o) = oldObserve o instance IsDeltaObservable k v (DeltaObservable k v) where subscribeDelta (DeltaObservable o) = subscribeDelta o instance Functor (DeltaObservable k) where @@ -61,7 +60,6 @@ data MappedDeltaObservable k b = forall a o. IsDeltaObservable k a o => MappedDe instance IsRetrievable (HM.HashMap k b) (MappedDeltaObservable k b) where retrieve (MappedDeltaObservable f o) = fmap f <<$>> retrieve o instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where - observe (MappedDeltaObservable f o) callback = observe o (callback . fmap (fmap f)) - unsafeAsyncObserveIO (MappedDeltaObservable f o) callback = unsafeAsyncObserveIO o (callback . fmap (fmap f)) + oldObserve (MappedDeltaObservable f o) callback = oldObserve o (callback . fmap (fmap f)) instance IsDeltaObservable k b (MappedDeltaObservable k b) where subscribeDelta (MappedDeltaObservable f o) callback = subscribeDelta o (callback . fmap f) diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index ef9987402b3a9e778ed34a2e3506be6e99da812e..0539f666de2f7f4033d441a9e9689d4e5617fe6c 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -39,7 +39,7 @@ makeLensesWith (lensField .~ (\_ _ -> pure . TopName . mkName . ("_" <>) . nameB instance IsRetrievable (HM.HashMap k v) (ObservableHashMap k v) where retrieve (ObservableHashMap mvar) = liftIO $ pure . HM.mapMaybe value . keyHandles <$> readMVar mvar instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where - unsafeAsyncObserveIO ohm callback = liftIO $ modifyHandle update ohm + oldObserve ohm callback = liftIO $ modifyHandle update ohm where update :: Handle k v -> IO (Handle k v, Disposable) update handle = do diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index 82c9c08f804e2606be2ee1f9f2cde7dd58fb9d6b..c0528bc4367f60b9e5940e2ff536d21892982477 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -25,7 +25,7 @@ instance IsRetrievable (Maybe v) (ObservablePriority p v) where getValueFromInternals Internals{current=Nothing} = Nothing getValueFromInternals Internals{current=Just (_, _, value)} = Just value instance IsObservable (Maybe v) (ObservablePriority p v) where - unsafeAsyncObserveIO (ObservablePriority mvar) callback = do + oldObserve (ObservablePriority mvar) callback = do key <- newUnique modifyMVar_ mvar $ \internals@Internals{subscribers} -> do -- Call listener diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index 0bef67ae470b7a9f13f932b486b31128381e54f2..5b658bcf676ab9238cf45afd309ede4593132d25 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -28,7 +28,7 @@ spec = parallel $ do lastCallbackValue <- newIORef undefined om <- OM.new :: IO (OM.ObservableHashMap String String) - subscriptionHandle <- unsafeAsyncObserveIO om $ writeIORef lastCallbackValue + subscriptionHandle <- oldObserve om $ writeIORef lastCallbackValue let lastCallbackShouldBe expected = do (ObservableUpdate update) <- readIORef lastCallbackValue update `shouldBe` expected @@ -85,7 +85,7 @@ spec = parallel $ do om <- OM.new :: IO (OM.ObservableHashMap String String) - void $ unsafeAsyncObserveIO (OM.observeKey "key1" om) (writeIORef value1) + void $ oldObserve (OM.observeKey "key1" om) (writeIORef value1) let v1ShouldBe expected = do (ObservableUpdate update) <- readIORef value1 update `shouldBe` expected @@ -98,7 +98,7 @@ spec = parallel $ do OM.insert "key2" "value2" om v1ShouldBe $ Just "value1" - handle2 <- unsafeAsyncObserveIO (OM.observeKey "key2" om) (writeIORef value2) + handle2 <- oldObserve (OM.observeKey "key2" om) (writeIORef value2) let v2ShouldBe expected = do (ObservableUpdate update) <- readIORef value2 update `shouldBe` expected diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index e58d9d97600d174d6c79c20f34bd2dbbd8c34655..45beb23917e64278aaecc2b8f33e41fd4a09e3b0 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -33,7 +33,7 @@ spec = do x `shouldBe` expected (op :: ObservablePriority Int String) <- OP.create - _s <- unsafeAsyncObserveIO op (modifyIORef result . (:)) + _s <- oldObserve op (modifyIORef result . (:)) mostRecentShouldBe Nothing p2 <- OP.insertValue op 2 "p2" diff --git a/test/Quasar/ObservableSpec.hs b/test/Quasar/ObservableSpec.hs index 7dd4e12e7ab26d219910cd46eeebfe61f06e0858..6987ee17f8e449924352e5821509016e0e70564b 100644 --- a/test/Quasar/ObservableSpec.hs +++ b/test/Quasar/ObservableSpec.hs @@ -41,7 +41,7 @@ mergeObservableSpec = do let mergedObservable = mergeObservable (,) a b (latestRef :: IORef (ObservableMessage (String, String))) <- newIORef (ObservableUpdate ("", "")) - void $ unsafeAsyncObserveIO mergedObservable (writeIORef latestRef) + void $ oldObserve mergedObservable (writeIORef latestRef) let latestShouldBe expected = do (ObservableUpdate x) <- readIORef latestRef x `shouldBe` expected