Skip to content
Snippets Groups Projects
Commit 499e0256 authored by Jens Nolte's avatar Jens Nolte
Browse files

Fix compatability observe implementaton

parent c3c90379
No related branches found
No related tags found
No related merge requests found
Pipeline #2416 passed
......@@ -91,20 +91,29 @@ class IsRetrievable v o => IsObservable v o | o -> v where
idVar <- liftIO $ newTVarIO (0 :: Word64)
calledIdVar <- liftIO $ newTVarIO (0 :: Word64)
resourceManager <- askResourceManager
bracketOnError
do
liftIO $ unsafeAsyncObserveIO observable \msg -> do
currentMessage <- atomically do
writeTVar msgVar msg
stateTVar idVar (dup . (+ 1))
-- Wait for `callback` to complete
atomically do
readTVar calledIdVar >>= \called ->
unless (called >= currentMessage) retry
-- HACK: use async to fork on MonadResourceManager
-- This should use MonadAsync instead, but this implementation is a temporary compatability wrapper and the
-- constraints are based on the new design.
liftIO $ onResourceManager resourceManager $ async do
attachDisposable resourceManager =<< liftIO do
unsafeAsyncObserveIO observable \msg -> do
currentMsgId <- atomically do
writeTVar msgVar msg
stateTVar idVar (dup . (+ 1))
-- Wait for `callback` to complete
atomically do
readTVar calledIdVar >>= \calledId ->
unless (calledId >= currentMsgId) retry
do disposeAndAwait
do
const $ forever do
(msgId, msg) <- liftIO $ atomically $ liftA2 (,) (readTVar idVar) (readTVar msgVar)
(msgId, msg) <- liftIO $ atomically do
msgAvailable <- liftA2 (>) (readTVar idVar) (readTVar calledIdVar)
unless msgAvailable retry
liftA2 (,) (readTVar idVar) (readTVar msgVar)
callback msg
liftIO $ atomically $ writeTVar calledIdVar msgId
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment