Skip to content
Snippets Groups Projects
Commit 5a3a3172 authored by Jens Nolte's avatar Jens Nolte
Browse files

Rewrite observeBlocking for new observable signature


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 90cea54d
No related branches found
No related tags found
No related merge requests found
......@@ -113,41 +113,20 @@ class IsRetrievable v o => IsObservable v o | o -> v where
-- | Observes an observable by handling updates on the current thread.
observeBlocking :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a
observeBlocking observable callback = do
msgVar <- liftIO $ newTVarIO ObservableLoading
idVar <- liftIO $ newTVarIO (0 :: Word64)
calledIdVar <- liftIO $ newTVarIO (0 :: Word64)
completedVar <- newAsyncVar
resourceManager <- askResourceManager
finally
do
bracketOnError
do
-- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design.
forkTask do
attachDisposable resourceManager =<< liftIO do
oldObserve observable \msg -> do
currentMsgId <- atomically do
writeTVar msgVar msg
stateTVar idVar (dup . (+ 1))
-- Wait for `callback` to complete
awaitAny2
do toAwaitable completedVar
do
unsafeAwaitSTM do
readTVar calledIdVar >>= \calledId ->
unless (calledId >= currentMsgId) retry
do disposeAndAwait
do
const $ forever do
(msgId, msg) <- liftIO $ atomically do
msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar)
unless msgAvailable retry
liftA2 (,) (readTVar idVar) (readTVar msgVar)
callback msg
liftIO $ atomically $ writeTVar calledIdVar msgId
do
putAsyncVar completedVar ()
withSubResourceManagerM do
var <- liftIO newEmptyTMVarIO
observe observable \msg -> do
liftIO $ atomically do
cbCompletedVar <- tryTakeTMVar var >>= \case
Nothing -> newAsyncVarSTM
Just (_, cbCompletedVar) -> pure cbCompletedVar
putTMVar var (msg, cbCompletedVar)
pure $ toAwaitable cbCompletedVar
forever do
(msg, cbCompletedVar) <- liftIO $ atomically $ takeTMVar var
callback msg
putAsyncVar_ cbCompletedVar ()
asyncObserve :: IsObservable v o => MonadAsync m => o -> (ObservableMessage v -> m ()) -> m ()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment