diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 2a6254093b7be005a89674f0c21e3caf6fbced16..b87fec59a91c325f68d6df3c0eb0a629fa0ec915 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -113,41 +113,20 @@ class IsRetrievable v o => IsObservable v o | o -> v where -- | Observes an observable by handling updates on the current thread. observeBlocking :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a observeBlocking observable callback = do - msgVar <- liftIO $ newTVarIO ObservableLoading - idVar <- liftIO $ newTVarIO (0 :: Word64) - calledIdVar <- liftIO $ newTVarIO (0 :: Word64) - completedVar <- newAsyncVar - - resourceManager <- askResourceManager - finally - do - bracketOnError - do - -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design. - forkTask do - attachDisposable resourceManager =<< liftIO do - oldObserve observable \msg -> do - currentMsgId <- atomically do - writeTVar msgVar msg - stateTVar idVar (dup . (+ 1)) - -- Wait for `callback` to complete - awaitAny2 - do toAwaitable completedVar - do - unsafeAwaitSTM do - readTVar calledIdVar >>= \calledId -> - unless (calledId >= currentMsgId) retry - do disposeAndAwait - do - const $ forever do - (msgId, msg) <- liftIO $ atomically do - msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar) - unless msgAvailable retry - liftA2 (,) (readTVar idVar) (readTVar msgVar) - callback msg - liftIO $ atomically $ writeTVar calledIdVar msgId - do - putAsyncVar completedVar () + withSubResourceManagerM do + var <- liftIO newEmptyTMVarIO + observe observable \msg -> do + liftIO $ atomically do + cbCompletedVar <- tryTakeTMVar var >>= \case + Nothing -> newAsyncVarSTM + Just (_, cbCompletedVar) -> pure cbCompletedVar + putTMVar var (msg, cbCompletedVar) + pure $ toAwaitable cbCompletedVar + + forever do + (msg, cbCompletedVar) <- liftIO $ atomically $ takeTMVar var + callback msg + putAsyncVar_ cbCompletedVar () asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> m ()) -> m ()