diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 7cbb126990fd6d8b6fe2a818993d7709c2fa40d0..e8faa5d6f69b28113748a845281bc11f17d483c0 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -86,30 +86,38 @@ class IsRetrievable v o => IsObservable v o | o -> v where msgVar <- liftIO $ newTVarIO ObservableLoading idVar <- liftIO $ newTVarIO (0 :: Word64) calledIdVar <- liftIO $ newTVarIO (0 :: Word64) + completedVar <- newAsyncVar resourceManager <- askResourceManager - bracketOnError + finally do - -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design. - forkTask do - attachDisposable resourceManager =<< liftIO do - unsafeAsyncObserveIO observable \msg -> do - currentMsgId <- atomically do - writeTVar msgVar msg - stateTVar idVar (dup . (+ 1)) - -- Wait for `callback` to complete - atomically do - readTVar calledIdVar >>= \calledId -> - unless (calledId >= currentMsgId) retry - do disposeAndAwait + bracketOnError + do + -- This implementation is a temporary compatability wrapper and forking isn't necessary with the new design. + forkTask do + attachDisposable resourceManager =<< liftIO do + unsafeAsyncObserveIO observable \msg -> do + currentMsgId <- atomically do + writeTVar msgVar msg + stateTVar idVar (dup . (+ 1)) + -- Wait for `callback` to complete + awaitAny2 + do toAwaitable completedVar + do + unsafeAwaitSTM do + readTVar calledIdVar >>= \calledId -> + unless (calledId >= currentMsgId) retry + do disposeAndAwait + do + const $ forever do + (msgId, msg) <- liftIO $ atomically do + msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar) + unless msgAvailable retry + liftA2 (,) (readTVar idVar) (readTVar msgVar) + callback msg + liftIO $ atomically $ writeTVar calledIdVar msgId do - const $ forever do - (msgId, msg) <- liftIO $ atomically do - msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar) - unless msgAvailable retry - liftA2 (,) (readTVar idVar) (readTVar msgVar) - callback msg - liftIO $ atomically $ writeTVar calledIdVar msgId + putAsyncVar completedVar () unsafeAsyncObserveIO :: o -> (ObservableMessage v -> IO ()) -> IO Disposable unsafeAsyncObserveIO observable callback = do