diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 7b9c6369c1439f8bd18f6bc5b602e75f66ed86b4..eea49d59478ec70a33858ef3838adbd7a658134f 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -91,20 +91,29 @@ class IsRetrievable v o => IsObservable v o | o -> v where idVar <- liftIO $ newTVarIO (0 :: Word64) calledIdVar <- liftIO $ newTVarIO (0 :: Word64) + resourceManager <- askResourceManager bracketOnError do - liftIO $ unsafeAsyncObserveIO observable \msg -> do - currentMessage <- atomically do - writeTVar msgVar msg - stateTVar idVar (dup . (+ 1)) - -- Wait for `callback` to complete - atomically do - readTVar calledIdVar >>= \called -> - unless (called >= currentMessage) retry + -- HACK: use async to fork on MonadResourceManager + -- This should use MonadAsync instead, but this implementation is a temporary compatability wrapper and the + -- constraints are based on the new design. + liftIO $ onResourceManager resourceManager $ async do + attachDisposable resourceManager =<< liftIO do + unsafeAsyncObserveIO observable \msg -> do + currentMsgId <- atomically do + writeTVar msgVar msg + stateTVar idVar (dup . (+ 1)) + -- Wait for `callback` to complete + atomically do + readTVar calledIdVar >>= \calledId -> + unless (calledId >= currentMsgId) retry do disposeAndAwait do const $ forever do - (msgId, msg) <- liftIO $ atomically $ liftA2 (,) (readTVar idVar) (readTVar msgVar) + (msgId, msg) <- liftIO $ atomically do + msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar) + unless msgAvailable retry + liftA2 (,) (readTVar idVar) (readTVar msgVar) callback msg liftIO $ atomically $ writeTVar calledIdVar msgId