From 499e025611d10540dbebf0e2732a0316a6e111bd Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Mon, 30 Aug 2021 01:27:12 +0200 Subject: [PATCH] Fix compatability observe implementaton --- src/Quasar/Observable.hs | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 7b9c636..eea49d5 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -91,20 +91,29 @@ class IsRetrievable v o => IsObservable v o | o -> v where idVar <- liftIO $ newTVarIO (0 :: Word64) calledIdVar <- liftIO $ newTVarIO (0 :: Word64) + resourceManager <- askResourceManager bracketOnError do - liftIO $ unsafeAsyncObserveIO observable \msg -> do - currentMessage <- atomically do - writeTVar msgVar msg - stateTVar idVar (dup . (+ 1)) - -- Wait for `callback` to complete - atomically do - readTVar calledIdVar >>= \called -> - unless (called >= currentMessage) retry + -- HACK: use async to fork on MonadResourceManager + -- This should use MonadAsync instead, but this implementation is a temporary compatability wrapper and the + -- constraints are based on the new design. + liftIO $ onResourceManager resourceManager $ async do + attachDisposable resourceManager =<< liftIO do + unsafeAsyncObserveIO observable \msg -> do + currentMsgId <- atomically do + writeTVar msgVar msg + stateTVar idVar (dup . (+ 1)) + -- Wait for `callback` to complete + atomically do + readTVar calledIdVar >>= \calledId -> + unless (calledId >= currentMsgId) retry do disposeAndAwait do const $ forever do - (msgId, msg) <- liftIO $ atomically $ liftA2 (,) (readTVar idVar) (readTVar msgVar) + (msgId, msg) <- liftIO $ atomically do + msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar) + unless msgAvailable retry + liftA2 (,) (readTVar idVar) (readTVar msgVar) callback msg liftIO $ atomically $ writeTVar calledIdVar msgId -- GitLab