From 93cccac990339c6261b4cfcba1be5095a1951251 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Fri, 29 Oct 2021 00:43:29 +0200
Subject: [PATCH] Reimplement observe; remove oldObserve

Co-authored-by: Jan Beinke <git@janbeinke.com>
---
 src/Quasar/Observable.hs                      | 318 ++++++------------
 src/Quasar/Observable/Delta.hs                |   7 +-
 src/Quasar/Observable/ObservableHashMap.hs    |  82 ++---
 src/Quasar/Observable/ObservablePriority.hs   |  30 +-
 src/Quasar/ResourceManager.hs                 |  41 ++-
 .../Observable/ObservableHashMapSpec.hs       |  77 +++--
 .../Observable/ObservablePrioritySpec.hs      |  41 ++-
 test/Quasar/ObservableSpec.hs                 |  19 +-
 8 files changed, 284 insertions(+), 331 deletions(-)

diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index bca82b9..4645ec4 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -3,7 +3,6 @@
 module Quasar.Observable (
   -- * Observable core types
   IsRetrievable(..),
-  retrieveIO,
   IsObservable(..),
   Observable(..),
   ObservableMessage(..),
@@ -13,9 +12,8 @@ module Quasar.Observable (
   ObservableVar,
   newObservableVar,
   setObservableVar,
-  withObservableVar,
   modifyObservableVar,
-  modifyObservableVar_,
+  stateObservableVar,
 
   -- * Helper functions
   observeWhile,
@@ -23,9 +21,6 @@ module Quasar.Observable (
   observeBlocking,
   fnObservable,
   synchronousFnObservable,
-  mergeObservable,
-  joinObservable,
-  bindObservable,
   unsafeObservableIO,
 
   -- * Helper types
@@ -76,37 +71,22 @@ toObservableUpdate (ObservableNotAvailable ex) = throwM ex
 class IsRetrievable v a | a -> v where
   retrieve :: MonadResourceManager m => a -> m (Awaitable v)
 
--- TODO remove
-retrieveIO :: IsRetrievable v a => a -> IO v
-retrieveIO x = withRootResourceManager $ await =<< retrieve x
-
 class IsRetrievable v o => IsObservable v o | o -> v where
   -- | Register a callback to observe changes. The callback is called when the value changes, but depending on the
   -- delivery method (e.g. network) intermediate values may be skipped.
   --
-  -- A correct implementation of observe will call the callback during registration (if no value is available
+  -- A correct implementation of observe must call the callback during registration (if no value is available
   -- immediately an `ObservableLoading` will be delivered).
   --
   -- The callback must return without blocking, otherwise other callbacks will be delayed. If the value can't be
-  -- processed immediately, use `observeBlocking` instead or manually pass the value e.g. by using STM.
+  -- processed immediately, use `observeBlocking` instead or manually pass the value to a thread that processes the
+  -- data, e.g. by using STM.
   observe
     :: MonadResourceManager m
     => o -- ^ observable
-    -> (forall f. MonadResourceManager f => ObservableMessage v -> f ()) -- ^ callback
+    -> (ObservableMessage v -> ResourceManagerIO ()) -- ^ callback
     -> m ()
-  -- NOTE Compatability implementation, has to be removed when `oldObserve` is removed
-  observe observable callback = mask_ do
-    resourceManager <- askResourceManager
-    disposable <- liftIO $ oldObserve observable (\msg -> runReaderT (callback msg) resourceManager)
-    registerDisposable disposable
-
-  -- | Old signature of `observe`, will be removed from the class once it's no longer used for implementations.
-  oldObserve :: o -> (ObservableMessage v -> IO ()) -> IO Disposable
-  oldObserve observable callback = do
-    resourceManager <- (undefined :: IO ResourceManager)
-    onResourceManager resourceManager do
-      observe observable $ \msg -> liftIO (callback msg)
-    pure $ toDisposable resourceManager
+  observe observable = observe (toObservable observable)
 
   toObservable :: o -> Observable v
   toObservable = Observable
@@ -114,10 +94,7 @@ class IsRetrievable v o => IsObservable v o | o -> v where
   mapObservable :: (v -> a) -> o -> Observable a
   mapObservable f = Observable . MappedObservable f
 
-  {-# MINIMAL observe | oldObserve #-}
-  -- TODO the goal: {-# MINIMAL toObservable | observe #-}
-
-{-# DEPRECATED oldObserve "Old implementation of `observe`." #-}
+  {-# MINIMAL toObservable | observe #-}
 
 
 -- | Observe an observable by handling updates on the current thread.
@@ -140,6 +117,7 @@ observeBlocking observable handler = do
       handler msg
 
 
+-- | Internal control flow exception for `observeWhile` and `observeWhile_`.
 data ObserveWhileCompleted = ObserveWhileCompleted
   deriving stock (Eq, Show)
 
@@ -179,7 +157,6 @@ instance IsRetrievable v (Observable v) where
   retrieve (Observable o) = retrieve o
 instance IsObservable v (Observable v) where
   observe (Observable o) = observe o
-  oldObserve (Observable o) = oldObserve o
   toObservable = id
   mapObservable f (Observable o) = mapObservable f o
 
@@ -188,7 +165,7 @@ instance Functor Observable where
 
 instance Applicative Observable where
   pure = toObservable . ConstObservable
-  liftA2 fn x y = toObservable $ MergedObservable fn x y
+  liftA2 fn x y = toObservable $ LiftA2Observable fn x y
 
 instance Monad Observable where
   x >>= y = toObservable $ BindObservable x y
@@ -216,7 +193,6 @@ instance IsRetrievable v (MappedObservable v) where
   retrieve (MappedObservable f observable) = f <<$>> retrieve observable
 instance IsObservable v (MappedObservable v) where
   observe (MappedObservable fn observable) callback = observe observable (callback . fmap fn)
-  oldObserve (MappedObservable fn observable) callback = oldObserve observable (callback . fmap fn)
   mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream
 
 
@@ -229,66 +205,40 @@ instance IsRetrievable r (BindObservable r) where
     retrieve $ fn x
 
 instance IsObservable r (BindObservable r) where
-  oldObserve :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable
-  oldObserve (BindObservable fx fn) callback = do
-    -- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
-    resourceManager <- (undefined :: IO ResourceManager)
+  observe :: MonadResourceManager m => (BindObservable r) -> (ObservableMessage r -> ResourceManagerIO ()) -> m ()
+  observe (BindObservable fx fn) callback = do
+    disposableVar <- liftIO $ newTMVarIO noDisposable
+    keyVar <- liftIO $ newTMVarIO =<< newUnique
 
-    isDisposingVar <- newTVarIO False
-    disposableVar <- newTMVarIO noDisposable
-    keyVar <- newTMVarIO Nothing
-
-    leftDisposable <- oldObserve fx (outerCallback resourceManager isDisposingVar disposableVar keyVar)
-
-    attachDisposeAction_ resourceManager $ undefined -- do
-      --atomically $ writeTVar isDisposingVar True
-      --d1 <- dispose leftDisposable
-      ---- Block while the `outerCallback` is running
-      --d2 <- dispose =<< atomically (takeTMVar disposableVar)
-      --pure (d1 <> d2)
-
-    pure $ toDisposable resourceManager
+    observe fx (leftCallback disposableVar keyVar)
     where
-      outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do
-        key <- newUnique
-
-        join $ atomically $ do
-          readTVar isDisposingVar >>= \case
-            False -> do
-              -- Blocks while an inner callback is running
-              void $ swapTMVar keyVar (Just key)
+      leftCallback disposableVar keyVar message = do
+        key <- liftIO newUnique
 
-              oldDisposable <- takeTMVar disposableVar
+        oldDisposable <- liftIO $ atomically do
+          -- Blocks while `rightCallback` is running
+          void $ swapTMVar keyVar key
 
-              -- IO action that will run after the STM transaction
-              pure do
-                onResourceManager resourceManager do
-                  disposeEventually oldDisposable
+          takeTMVar disposableVar
 
-                disposable <-
-                  unmask (outerMessageHandler key observableMessage)
-                    `onException`
-                      atomically (putTMVar disposableVar noDisposable)
+        disposeEventually_ oldDisposable
 
-                atomically $ putTMVar disposableVar disposable
+        disposable <- case message of
+          (ObservableUpdate x) -> captureDisposable_ $ observe (fn x) (rightCallback keyVar key)
+          ObservableLoading -> noDisposable <$ callback ObservableLoading
+          (ObservableNotAvailable ex) -> noDisposable <$ callback (ObservableNotAvailable ex)
 
-            -- When already disposing no new handlers should be registered
-            True -> pure $ pure ()
+        liftIO $ atomically $ putTMVar disposableVar disposable
 
-        where
-          outerMessageHandler key (ObservableUpdate x) = oldObserve (fn x) (innerCallback key)
-          outerMessageHandler _ ObservableLoading = noDisposable <$ callback ObservableLoading
-          outerMessageHandler _ (ObservableNotAvailable ex) = noDisposable <$ callback (ObservableNotAvailable ex)
-
-          innerCallback :: Unique -> ObservableMessage r -> IO ()
-          innerCallback key x = do
-            bracket
-              -- Take key var to prevent parallel callbacks
-              (atomically $ takeTMVar keyVar)
-              -- Put key back
-              (atomically . putTMVar keyVar)
-              -- Call callback when key is still valid
-              (\currentKey -> when (Just key == currentKey) $ callback x)
+      rightCallback :: TMVar Unique -> Unique -> ObservableMessage r -> ResourceManagerIO ()
+      rightCallback keyVar key message =
+        bracket
+          -- Take key var to prevent parallel callbacks
+          (liftIO $ atomically $ takeTMVar keyVar)
+          -- Put key back
+          (liftIO . atomically . putTMVar keyVar)
+          -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed
+          (\currentKey -> when (key == currentKey) $ callback message)
 
 
 data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r)
@@ -297,65 +247,39 @@ instance IsRetrievable r (CatchObservable e r) where
   retrieve (CatchObservable fx fn) = retrieve fx `catch` \ex -> retrieve (fn ex)
 
 instance IsObservable r (CatchObservable e r) where
-  oldObserve :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable
-  oldObserve (CatchObservable fx fn) callback = do
-    -- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
-    resourceManager <- (undefined :: IO ResourceManager)
-
-    isDisposingVar <- newTVarIO False
-    disposableVar <- newTMVarIO noDisposable
-    keyVar <- newTMVarIO Nothing
-
-    leftDisposable <- oldObserve fx (outerCallback resourceManager isDisposingVar disposableVar keyVar)
-
-    attachDisposeAction_ resourceManager $ undefined -- do
-      --atomically $ writeTVar isDisposingVar True
-      --d1 <- dispose leftDisposable
-      ---- Block while the `outerCallback` is running
-      --d2 <- dispose =<< atomically (takeTMVar disposableVar)
-      --pure (d1 <> d2)
+  observe :: MonadResourceManager m => (CatchObservable e r) -> (ObservableMessage r -> ResourceManagerIO ()) -> m ()
+  observe (CatchObservable fx fn) callback = do
+    disposableVar <- liftIO $ newTMVarIO noDisposable
+    keyVar <- liftIO $ newTMVarIO =<< newUnique
 
-    pure $ toDisposable resourceManager
+    observe fx (leftCallback disposableVar keyVar)
     where
-      outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do
-        key <- newUnique
+      leftCallback disposableVar keyVar message = do
+        key <- liftIO newUnique
 
-        join $ atomically $ do
-          readTVar isDisposingVar >>= \case
-            False -> do
-              -- Blocks while an inner callback is running
-              void $ swapTMVar keyVar (Just key)
+        oldDisposable <- liftIO $ atomically do
+          -- Blocks while `rightCallback` is running
+          void $ swapTMVar keyVar key
 
-              oldDisposable <- takeTMVar disposableVar
+          takeTMVar disposableVar
 
-              -- IO action that will run after the STM transaction
-              pure do
-                onResourceManager resourceManager do
-                  disposeEventually oldDisposable
+        disposeEventually_ oldDisposable
 
-                disposable <-
-                  unmask (outerMessageHandler key observableMessage)
-                    `onException`
-                      atomically (putTMVar disposableVar noDisposable)
+        disposable <- case message of
+          (ObservableNotAvailable (fromException -> Just ex)) -> captureDisposable_ $ observe (fn ex) (rightCallback keyVar key)
+          msg -> noDisposable <$ callback msg
 
-                atomically $ putTMVar disposableVar disposable
+        liftIO $ atomically $ putTMVar disposableVar disposable
 
-            -- When already disposing no new handlers should be registered
-            True -> pure $ pure ()
-
-        where
-          outerMessageHandler key (ObservableNotAvailable (fromException -> Just ex)) = oldObserve (fn ex) (innerCallback key)
-          outerMessageHandler _ msg = noDisposable <$ callback msg
-
-          innerCallback :: Unique -> ObservableMessage r -> IO ()
-          innerCallback key x = do
-            bracket
-              -- Take key var to prevent parallel callbacks
-              (atomically $ takeTMVar keyVar)
-              -- Put key back
-              (atomically . putTMVar keyVar)
-              -- Call callback when key is still valid
-              (\currentKey -> when (Just key == currentKey) $ callback x)
+      rightCallback :: TMVar Unique -> Unique -> ObservableMessage r -> ResourceManagerIO ()
+      rightCallback keyVar key message =
+        bracket
+          -- Take key var to prevent parallel callbacks
+          (liftIO $ atomically $ takeTMVar keyVar)
+          -- Put key back
+          (liftIO . atomically . putTMVar keyVar)
+          -- Ignore all callbacks that arrive from the old `fn` when a new `fx` has been observed
+          (\currentKey -> when (key == currentKey) $ callback message)
 
 
 
@@ -363,13 +287,19 @@ newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableC
 instance IsRetrievable v (ObservableVar v) where
   retrieve (ObservableVar mvar) = liftIO $ pure . fst <$> readMVar mvar
 instance IsObservable v (ObservableVar v) where
-  oldObserve (ObservableVar mvar) callback = do
-    key <- newUnique
-    modifyMVar_ mvar $ \(state, subscribers) -> do
-      -- Call listener
-      callback (pure state)
-      pure (state, HM.insert key callback subscribers)
-    newDisposable (disposeFn key)
+  observe observable@(ObservableVar mvar) callback = do
+    resourceManager <- askResourceManager
+    key <- liftIO newUnique
+
+    registerNewResource_ do
+      let wrappedCallback = handleByResourceManager resourceManager . callback
+
+      liftIO $ modifyMVar_ mvar $ \(state, subscribers) -> do
+        -- Call listener with initial value
+        wrappedCallback (pure state)
+        pure (state, HM.insert key wrappedCallback subscribers)
+
+      newDisposable $ disposeFn key
     where
       disposeFn :: Unique -> IO ()
       disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers))
@@ -379,83 +309,54 @@ newObservableVar initialValue = liftIO do
   ObservableVar <$> newMVar (initialValue, HM.empty)
 
 setObservableVar :: MonadIO m => ObservableVar v -> v -> m ()
-setObservableVar (ObservableVar mvar) value = liftIO $ modifyMVar_ mvar $ \(_, subscribers) -> do
-  mapM_ (\callback -> callback (pure value)) subscribers
-  pure (value, subscribers)
+setObservableVar observable value = modifyObservableVar observable (const value)
 
-
--- TODO change function signature to pure or STM; swap v and a
-modifyObservableVar :: MonadIO m => ObservableVar v -> (v -> IO (v, a)) -> m a
-modifyObservableVar (ObservableVar mvar) f =
+stateObservableVar :: MonadIO m => ObservableVar v -> (v -> (a, v)) -> m a
+stateObservableVar observable@(ObservableVar mvar) f =
   liftIO $ modifyMVar mvar $ \(oldState, subscribers) -> do
-    (newState, result) <- f oldState
+    let (result, newState) = f oldState
     mapM_ (\callback -> callback (pure newState)) subscribers
     pure ((newState, subscribers), result)
 
--- TODO change update function signature to pure or STM
-modifyObservableVar_ :: MonadIO m => ObservableVar v -> (v -> IO v) -> m ()
-modifyObservableVar_ (ObservableVar mvar) f =
-  liftIO $ modifyMVar_ mvar $ \(oldState, subscribers) -> do
-    newState <- f oldState
-    mapM_ (\callback -> callback (pure newState)) subscribers
-    pure (newState, subscribers)
-
--- TODO change inner monad to `m` after reimplementing ObservableVar
-withObservableVar :: MonadIO m => ObservableVar v -> (v -> IO a) -> m a
-withObservableVar (ObservableVar mvar) f = liftIO $ withMVar mvar (f . fst)
-
-
-
-bindObservable :: (IsObservable a ma, IsObservable b mb) => ma -> (a -> mb) -> Observable b
-bindObservable fx fn = (toObservable fx) >>= \x -> toObservable (fn x)
+modifyObservableVar :: MonadIO m => ObservableVar v -> (v -> v) -> m ()
+modifyObservableVar observable f = stateObservableVar observable (((), ) . f)
 
-joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v
-joinObservable = join . fmap toObservable . toObservable
 
 
 -- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
 -- observable updates according to the merge function.
 --
 -- There is no caching involed, every subscriber effectively subscribes to both input observables.
-data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1
-instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where
-  retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1)
-instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObservable r (MergedObservable r o0 v0 o1 v1) where
-  oldObserve (MergedObservable merge obs0 obs1) callback = do
-    var0 <- newTVarIO Nothing
-    var1 <- newTVarIO Nothing
-    d0 <- oldObserve obs0 (mergeCallback var0 var1 . writeTVar var0 . Just)
-    d1 <- oldObserve obs1 (mergeCallback var0 var1 . writeTVar var1 . Just)
-    undefined
-    --pure $ mconcat [d0, d1]
+data LiftA2Observable r = forall r0 r1. LiftA2Observable (r0 -> r1 -> r) (Observable r0) (Observable r1)
+
+instance IsRetrievable r (LiftA2Observable r) where
+  retrieve (LiftA2Observable fn fx fy) =
+    liftA2 (liftA2 fn) (retrieve fx) (retrieve fy)
+
+instance IsObservable r (LiftA2Observable r) where
+  observe (LiftA2Observable fn fx fy) callback = do
+    var0 <- liftIO $ newTVarIO Nothing
+    var1 <- liftIO $ newTVarIO Nothing
+    observe fx (mergeCallback var0 var1 . writeTVar var0 . Just)
+    observe fy (mergeCallback var0 var1 . writeTVar var1 . Just)
     where
-      mergeCallback :: TVar (Maybe (ObservableMessage v0)) -> TVar (Maybe (ObservableMessage v1)) -> STM () -> IO ()
       mergeCallback var0 var1 update = do
-        mMerged <- atomically $ do
+        mMerged <- liftIO $ atomically do
           update
-          runMaybeT $ liftA2 (liftA2 merge) (MaybeT (readTVar var0)) (MaybeT (readTVar var1))
+          runMaybeT $ liftA2 (liftA2 fn) (MaybeT (readTVar var0)) (MaybeT (readTVar var1))
 
         -- Run the callback only once both values have been received
         mapM_ callback mMerged
 
 
--- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
--- observable updates according to the merge function.
---
--- Behaves like `liftA2` on `Observable` but accepts anything that implements `IsObservable`..
---
--- There is no caching involed, every subscriber effectively subscribes to both input observables.
-mergeObservable :: (IsObservable v0 o0, IsObservable v1 o1) => (v0 -> v1 -> r) -> o0 -> o1 -> Observable r
-mergeObservable merge x y = Observable $ MergedObservable merge x y
-
 data FnObservable v = FnObservable {
-  retrieveFn :: forall m. MonadResourceManager m => m (Awaitable v),
-  observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable
+  retrieveFn :: ResourceManagerIO (Awaitable v),
+  observeFn :: (ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ()
 }
 instance IsRetrievable v (FnObservable v) where
-  retrieve o = retrieveFn o
+  retrieve o = liftResourceManagerIO $ retrieveFn o
 instance IsObservable v (FnObservable v) where
-  oldObserve o = observeFn o
+  observe o = observe o
   mapObservable f FnObservable{retrieveFn, observeFn} = Observable $ FnObservable {
     retrieveFn = f <<$>> retrieveFn,
     observeFn = \listener -> observeFn (listener . fmap f)
@@ -463,19 +364,20 @@ instance IsObservable v (FnObservable v) where
 
 -- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`.
 fnObservable
-  :: ((ObservableMessage v -> IO ()) -> IO Disposable)
-  -> (forall m. MonadResourceManager m => m (Awaitable v))
+  :: ((ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ())
+  -> ResourceManagerIO (Awaitable v)
   -> Observable v
 fnObservable observeFn retrieveFn = toObservable FnObservable{observeFn, retrieveFn}
 
 -- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`.
 synchronousFnObservable
-  :: forall v. ((ObservableMessage v -> IO ()) -> IO Disposable)
+  :: forall v.
+  ((ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ())
   -> IO v
   -> Observable v
 synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn retrieveFn
   where
-    retrieveFn :: (forall m. MonadResourceManager m => m (Awaitable v))
+    retrieveFn :: ResourceManagerIO (Awaitable v)
     retrieveFn = liftIO $ pure <$> synchronousRetrieveFn
 
 
@@ -484,7 +386,7 @@ instance IsRetrievable v (ConstObservable v) where
   retrieve (ConstObservable x) = pure $ pure x
 instance IsObservable v (ConstObservable v) where
   observe (ConstObservable x) callback = do
-    callback $ ObservableUpdate x
+    liftResourceManagerIO $ callback $ ObservableUpdate x
 
 
 newtype FailedObservable v = FailedObservable SomeException
@@ -492,22 +394,24 @@ instance IsRetrievable v (FailedObservable v) where
   retrieve (FailedObservable ex) = liftIO $ throwIO ex
 instance IsObservable v (FailedObservable v) where
   observe (FailedObservable ex) callback = do
-    callback $ ObservableNotAvailable ex
+    liftResourceManagerIO $ callback $ ObservableNotAvailable ex
 
 
 -- | Create an observable by simply running an IO action whenever a value is requested or a callback is registered.
 --
--- There is no mechanism to send more than one update, so the resulting `Observable` will only be correct in specific
--- situations.
+-- There is no mechanism to send more than one update, so the resulting `Observable` will only be useful in specific
+-- situations, e.g. as a primitive for building a cache where a static value has to be calculated/loaded).
+--
+-- The function supplied to unsafeObservableIO must produce the same value when called multiple times to create a
+-- correctly behaving observable.
 unsafeObservableIO :: forall v. IO v -> Observable v
 unsafeObservableIO action = synchronousFnObservable observeFn action
   where
-    observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable
+    observeFn :: (ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ()
     observeFn callback = do
       callback ObservableLoading
-      value <- (ObservableUpdate <$> action) `catchAll` (pure . ObservableNotAvailable @v)
+      value <- (ObservableUpdate <$> liftIO action) `catchAll` (pure . ObservableNotAvailable @v)
       callback value
-      pure noDisposable
 
 
 -- TODO implement
diff --git a/src/Quasar/Observable/Delta.hs b/src/Quasar/Observable/Delta.hs
index ea142c3..be92b74 100644
--- a/src/Quasar/Observable/Delta.hs
+++ b/src/Quasar/Observable/Delta.hs
@@ -30,7 +30,8 @@ instance (Eq k, Hashable k, Binary k, Binary v) => Binary (Delta k v) where
   put (Delete key) = B.put (2 :: Word8) >> B.put key
 
 class IsObservable (HM.HashMap k v) o => IsDeltaObservable k v o | o -> k, o -> v where
-  subscribeDelta :: o -> (Delta k v -> IO ()) -> IO Disposable
+  -- TODO change signature to use resource manager
+  subscribeDelta :: MonadIO m => o -> (Delta k v -> IO ()) -> m Disposable
 
 --observeHashMapDefaultImpl :: forall k v o. (Eq k, Hashable k) => IsDeltaObservable k v o => o -> (HM.HashMap k v -> IO ()) -> IO Disposable
 --observeHashMapDefaultImpl o callback = do
@@ -49,7 +50,7 @@ data DeltaObservable k v = forall o. IsDeltaObservable k v o => DeltaObservable
 instance IsRetrievable (HM.HashMap k v) (DeltaObservable k v) where
   retrieve (DeltaObservable o) = retrieve o
 instance IsObservable (HM.HashMap k v) (DeltaObservable k v) where
-  oldObserve (DeltaObservable o) = oldObserve o
+  observe (DeltaObservable o) = observe o
 instance IsDeltaObservable k v (DeltaObservable k v) where
   subscribeDelta (DeltaObservable o) = subscribeDelta o
 instance Functor (DeltaObservable k) where
@@ -60,6 +61,6 @@ data MappedDeltaObservable k b = forall a o. IsDeltaObservable k a o => MappedDe
 instance IsRetrievable (HM.HashMap k b) (MappedDeltaObservable k b) where
   retrieve (MappedDeltaObservable f o) = fmap f <<$>> retrieve o
 instance IsObservable (HM.HashMap k b) (MappedDeltaObservable k b) where
-  oldObserve (MappedDeltaObservable f o) callback = oldObserve o (callback . fmap (fmap f))
+  observe (MappedDeltaObservable f o) callback = observe o (callback . fmap (fmap f))
 instance IsDeltaObservable k b (MappedDeltaObservable k b) where
   subscribeDelta (MappedDeltaObservable f o) callback = subscribeDelta o (callback . fmap f)
diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs
index 79daa03..c05a1fc 100644
--- a/src/Quasar/Observable/ObservableHashMap.hs
+++ b/src/Quasar/Observable/ObservableHashMap.hs
@@ -33,19 +33,20 @@ data KeyHandle v = KeyHandle {
 instance IsRetrievable (HM.HashMap k v) (ObservableHashMap k v) where
   retrieve (ObservableHashMap mvar) = liftIO $ pure . HM.mapMaybe value . keyHandles <$> readMVar mvar
 instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where
-  oldObserve ohm callback = liftIO $ modifyHandle update ohm
-    where
-      update :: Handle k v -> IO (Handle k v, Disposable)
-      update handle = do
-        callback $ pure $ toHashMap handle
-        key <- newUnique
-        let handle' = handle {subscribers = HM.insert key callback (subscribers handle)}
-        (handle',) <$> newDisposable (unsubscribe key)
-      unsubscribe :: Unique -> IO ()
-      unsubscribe key = modifyHandle_ (\handle -> pure handle {subscribers = HM.delete key (subscribers handle)}) ohm
+  observe = undefined
+--  oldObserve ohm callback = liftIO $ modifyHandle update ohm
+--    where
+--      update :: Handle k v -> IO (Handle k v, Disposable)
+--      update handle = do
+--        callback $ pure $ toHashMap handle
+--        key <- newUnique
+--        let handle' = handle {subscribers = HM.insert key callback (subscribers handle)}
+--        (handle',) <$> newDisposable (unsubscribe key)
+--      unsubscribe :: Unique -> IO ()
+--      unsubscribe key = modifyHandle_ (\handle -> pure handle {subscribers = HM.delete key (subscribers handle)}) ohm
 
 instance IsDeltaObservable k v (ObservableHashMap k v) where
-  subscribeDelta ohm callback = modifyHandle update ohm
+  subscribeDelta ohm callback = liftIO $ modifyHandle update ohm
     where
       update :: Handle k v -> IO (Handle k v, Disposable)
       update handle = do
@@ -106,39 +107,40 @@ notifySubscribers handle@Handle{deltaSubscribers, subscribers} (Just delta) = do
 modifyKeySubscribers :: (HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ()) -> HM.HashMap Unique (ObservableMessage (Maybe v) -> IO ())) -> KeyHandle v -> KeyHandle v
 modifyKeySubscribers fn keyHandle = keyHandle {keySubscribers = fn (keySubscribers keyHandle)}
 
-new :: IO (ObservableHashMap k v)
-new = ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty}
+new :: MonadIO m => m (ObservableHashMap k v)
+new = liftIO $ ObservableHashMap <$> newMVar Handle{keyHandles=HM.empty, subscribers=HM.empty, deltaSubscribers=HM.empty}
 
 observeKey :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> Observable (Maybe v)
-observeKey key ohm@(ObservableHashMap mvar) = synchronousFnObservable observeFn retrieveFn
-  where
-    retrieveFn :: IO (Maybe v)
-    retrieveFn = liftIO do
-      handle <- readMVar mvar
-      pure $ join $ fmap value $ HM.lookup key $ keyHandles handle
-    observeFn :: ((ObservableMessage (Maybe v) -> IO ()) -> IO Disposable)
-    observeFn callback = do
-      subscriptionKey <- newUnique
-      modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm
-      newDisposable (unsubscribe subscriptionKey)
-      where
-        subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v)
-        subscribeFn' subKey keyHandle@KeyHandle{value} = do
-          callback $ pure value
-          pure $ modifyKeySubscribers (HM.insert subKey callback) keyHandle
-        unsubscribe :: Unique -> IO ()
-        unsubscribe subKey = modifyKeyHandle_ (pure . modifyKeySubscribers (HM.delete subKey)) key ohm
-
-insert :: forall k v. (Eq k, Hashable k) => k -> v -> ObservableHashMap k v -> IO ()
-insert key value = modifyKeyHandleNotifying_ fn key
+observeKey = undefined
+--observeKey key ohm@(ObservableHashMap mvar) = synchronousFnObservable observeFn retrieveFn
+--  where
+--    retrieveFn :: IO (Maybe v)
+--    retrieveFn = liftIO do
+--      handle <- readMVar mvar
+--      pure $ join $ fmap value $ HM.lookup key $ keyHandles handle
+--    observeFn :: ((ObservableMessage (Maybe v) -> IO ()) -> IO Disposable)
+--    observeFn callback = do
+--      subscriptionKey <- newUnique
+--      modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm
+--      newDisposable (unsubscribe subscriptionKey)
+--      where
+--        subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v)
+--        subscribeFn' subKey keyHandle@KeyHandle{value} = do
+--          callback $ pure value
+--          pure $ modifyKeySubscribers (HM.insert subKey callback) keyHandle
+--        unsubscribe :: Unique -> IO ()
+--        unsubscribe subKey = modifyKeyHandle_ (pure . modifyKeySubscribers (HM.delete subKey)) key ohm
+
+insert :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> v -> ObservableHashMap k v -> m ()
+insert key value = liftIO . modifyKeyHandleNotifying_ fn key
   where
     fn :: KeyHandle v -> IO (KeyHandle v, Maybe (Delta k v))
     fn keyHandle@KeyHandle{keySubscribers} = do
       mapM_ ($ pure $ Just value) $ HM.elems keySubscribers
       pure (keyHandle{value=Just value}, Just (Insert key value))
 
-delete :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> IO ()
-delete key = modifyKeyHandleNotifying_ fn key
+delete :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m ()
+delete key = liftIO . modifyKeyHandleNotifying_ fn key
   where
     fn :: KeyHandle v -> IO (KeyHandle v, Maybe (Delta k v))
     fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do
@@ -146,13 +148,13 @@ delete key = modifyKeyHandleNotifying_ fn key
       let delta = if isJust oldValue then Just (Delete key) else Nothing
       pure (keyHandle{value=Nothing}, delta)
 
-lookup :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> IO (Maybe v)
-lookup key (ObservableHashMap mvar) = do
+lookup :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m (Maybe v)
+lookup key (ObservableHashMap mvar) = liftIO do
   Handle{keyHandles} <- readMVar mvar
   pure $ join $ value <$> HM.lookup key keyHandles
 
-lookupDelete :: forall k v. (Eq k, Hashable k) => k -> ObservableHashMap k v -> IO (Maybe v)
-lookupDelete key = modifyKeyHandleNotifying fn key
+lookupDelete :: forall k v m. (Eq k, Hashable k, MonadIO m) => k -> ObservableHashMap k v -> m (Maybe v)
+lookupDelete key = liftIO . modifyKeyHandleNotifying fn key
   where
     fn :: KeyHandle v -> IO (KeyHandle v, (Maybe (Delta k v), Maybe v))
     fn keyHandle@KeyHandle{value=oldValue, keySubscribers} = do
diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs
index edb9c50..d341cf6 100644
--- a/src/Quasar/Observable/ObservablePriority.hs
+++ b/src/Quasar/Observable/ObservablePriority.hs
@@ -25,16 +25,17 @@ instance IsRetrievable (Maybe v) (ObservablePriority p v) where
       getValueFromInternals Internals{current=Nothing} = Nothing
       getValueFromInternals Internals{current=Just (_, _, value)} = Just value
 instance IsObservable (Maybe v) (ObservablePriority p v) where
-  oldObserve (ObservablePriority mvar) callback = do
-    key <- newUnique
-    modifyMVar_ mvar $ \internals@Internals{subscribers} -> do
-      -- Call listener
-      callback (pure (currentValue internals))
-      pure internals{subscribers = HM.insert key callback subscribers}
-    newDisposable (unsubscribe key)
-    where
-      unsubscribe :: Unique -> IO ()
-      unsubscribe key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers}
+  observe = undefined
+  --oldObserve (ObservablePriority mvar) callback = do
+  --  key <- newUnique
+  --  modifyMVar_ mvar $ \internals@Internals{subscribers} -> do
+  --    -- Call listener
+  --    callback (pure (currentValue internals))
+  --    pure internals{subscribers = HM.insert key callback subscribers}
+  --  newDisposable (unsubscribe key)
+  --  where
+  --    unsubscribe :: Unique -> IO ()
+  --    unsubscribe key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers}
 
 type PriorityMap p v = HM.HashMap p (NonEmpty (Entry v))
 
@@ -45,8 +46,9 @@ data Internals p v = Internals {
 }
 
 -- | Create a new `ObservablePriority` data structure.
-create :: IO (ObservablePriority p v)
-create = ObservablePriority <$> newMVar Internals {
+create :: MonadIO m => m (ObservablePriority p v)
+create = liftIO do
+  ObservablePriority <$> newMVar Internals {
     priorityMap = HM.empty,
     current = Nothing,
     subscribers = HM.empty
@@ -57,8 +59,8 @@ currentValue Internals{current} = (\(_, _, value) -> value) <$> current
 
 -- | Insert a value with an assigned priority into the data structure. If the priority is higher than the current highest priority the value will become the current value (and will be sent to subscribers). Otherwise the value will be stored and will only become the current value when all values with a higher priority and all values with the same priority that have been inserted earlier have been removed.
 -- Returns an `Disposable` that can be used to remove the value from the data structure.
-insertValue :: forall p v. (Ord p, Hashable p) => ObservablePriority p v -> p -> v -> IO Disposable
-insertValue (ObservablePriority mvar) priority value = modifyMVar mvar $ \internals -> do
+insertValue :: forall p v m. MonadIO m => (Ord p, Hashable p) => ObservablePriority p v -> p -> v -> m Disposable
+insertValue (ObservablePriority mvar) priority value = liftIO $ modifyMVar mvar $ \internals -> do
   key <- newUnique
   newInternals <- insertValue' key internals
   (newInternals,) <$> newDisposable (removeValue key)
diff --git a/src/Quasar/ResourceManager.hs b/src/Quasar/ResourceManager.hs
index 2b28bfe..cde4ab1 100644
--- a/src/Quasar/ResourceManager.hs
+++ b/src/Quasar/ResourceManager.hs
@@ -1,14 +1,19 @@
 module Quasar.ResourceManager (
   -- * MonadResourceManager
   MonadResourceManager(..),
+  ResourceManagerT,
+  ResourceManagerIO,
   FailedToRegisterResource,
   registerNewResource,
+  registerNewResource_,
   registerDisposable,
   registerDisposeAction,
   withSubResourceManagerM,
   onResourceManager,
   captureDisposable,
   captureDisposable_,
+  liftResourceManagerIO,
+  handleByResourceManager,
 
   -- ** Top level initialization
   withRootResourceManager,
@@ -47,6 +52,14 @@ import Quasar.Utils.Concurrent
 import Quasar.Utils.Exceptions
 
 
+
+-- TODO replacement for MonadAsync scheduler
+--scheduleAfter :: MonadScheduler m => Awaitable a -> (a -> SchedulerIO (Awaitable b)) -> m (Awaitable b)
+--scheduleAfter' :: Awaitable a -> (a -> SchedulerIO b) -> m (Awaitable b)
+--scheduleAfter_ :: Awaitable a -> (a -> IO ()) -> m ()
+
+
+
 data DisposeException = DisposeException SomeException
   deriving stock Show
   deriving anyclass Exception
@@ -122,6 +135,9 @@ registerNewResource action = mask_ do
     attachDisposable resourceManager resource
     pure resource
 
+registerNewResource_ :: (IsDisposable a, MonadResourceManager m) => m a -> m ()
+registerNewResource_ action = void $ registerNewResource action
+
 
 -- TODO rename to withResourceScope, subResourceManager or withResourceManager?
 withSubResourceManagerM :: MonadResourceManager m => m a -> m a
@@ -129,7 +145,10 @@ withSubResourceManagerM action =
   bracket newResourceManager dispose \scope -> localResourceManager scope action
 
 
-instance (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager (ReaderT ResourceManager m) where
+type ResourceManagerT = ReaderT ResourceManager
+type ResourceManagerIO = ResourceManagerT IO
+
+instance (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager (ResourceManagerT m) where
   localResourceManager resourceManager = local (const (toResourceManager resourceManager))
 
   askResourceManager = ask
@@ -142,13 +161,17 @@ instance {-# OVERLAPPABLE #-} MonadResourceManager m => MonadResourceManager (Re
     x <- ask
     lift $ localResourceManager resourceManager $ runReaderT action x
 
-
 -- TODO MonadResourceManager instances for StateT, WriterT, RWST, MaybeT, ...
 
 
-onResourceManager :: (IsResourceManager a, MonadIO m) => a -> ReaderT ResourceManager IO r -> m r
+onResourceManager :: (IsResourceManager a, MonadIO m) => a -> ResourceManagerIO r -> m r
 onResourceManager target action = liftIO $ runReaderT action (toResourceManager target)
 
+liftResourceManagerIO :: MonadResourceManager m => ResourceManagerIO r -> m r
+liftResourceManagerIO action = do
+  resourceManager <- askResourceManager
+  onResourceManager resourceManager action
+
 
 captureDisposable :: MonadResourceManager m => m a -> m (a, Disposable)
 captureDisposable action = do
@@ -160,6 +183,13 @@ captureDisposable action = do
 captureDisposable_ :: MonadResourceManager m => m () -> m Disposable
 captureDisposable_ = snd <<$>> captureDisposable
 
+-- | Run a computation and throw any exception that occurs to the resource manager.
+--
+-- This can be used to run e.g. callbacks that belong to a different resource context.
+handleByResourceManager :: ResourceManager -> ResourceManagerIO () -> IO ()
+handleByResourceManager resourceManager action =
+  onResourceManager resourceManager do
+    action `catchAll` \ex -> liftIO $ throwToResourceManager resourceManager ex
 
 
 -- * Resource manager implementations
@@ -227,8 +257,8 @@ newUnmanagedRootResourceManagerInternal = liftIO do
             putAsyncVarSTM_ finalExceptionsVar $ toList exceptions
 
 
-withRootResourceManager :: (MonadAwait m, MonadMask m, MonadIO m) => ReaderT ResourceManager IO a -> m a
-withRootResourceManager action = uninterruptibleMask \unmask -> do
+withRootResourceManager :: MonadIO m => ResourceManagerIO a -> m a
+withRootResourceManager action = liftIO $ uninterruptibleMask \unmask -> do
   resourceManager@(RootResourceManager _ _ _ finalExceptionsVar) <- newUnmanagedRootResourceManagerInternal
 
   result <- try $ unmask $ onResourceManager resourceManager action
@@ -476,4 +506,3 @@ linkExecution action = do
       if key == exceptionKey
         then return Nothing
         else throwM ex
-
diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs
index d18fde0..3a2c6e8 100644
--- a/test/Quasar/Observable/ObservableHashMapSpec.hs
+++ b/test/Quasar/Observable/ObservableHashMapSpec.hs
@@ -1,35 +1,42 @@
 module Quasar.Observable.ObservableHashMapSpec (spec) where
 
-import Quasar.Disposable
-import Quasar.Observable
-import Quasar.Observable.Delta
-import Quasar.Observable.ObservableHashMap qualified as OM
 
 import Control.Monad (void)
 import Data.HashMap.Strict qualified as HM
 import Data.IORef
-import Prelude
+import Quasar.Awaitable
+import Quasar.Disposable
+import Quasar.Observable
+import Quasar.Observable.Delta
+import Quasar.Observable.ObservableHashMap qualified as OM
+import Quasar.Prelude
+import Quasar.ResourceManager
 import Test.Hspec
 
+shouldReturnM :: (Eq a, Show a, MonadIO m) => m a -> a -> m ()
+shouldReturnM action expected = do
+  result <- action
+  liftIO $ result `shouldBe` expected
+
 spec :: Spec
 spec = parallel $ do
-  describe "retrieveIO" $ do
-    it "returns the contents of the map" $ do
-      om <- OM.new :: IO (OM.ObservableHashMap String String)
-      retrieveIO om `shouldReturn` HM.empty
+  describe "retrieve" $ do
+    it "returns the contents of the map" $ io $ withRootResourceManager do
+      om :: OM.ObservableHashMap String String <- OM.new
+      (retrieve om >>= await) `shouldReturnM` HM.empty
       -- Evaluate unit for coverage
       () <- OM.insert "key" "value" om
-      retrieveIO om `shouldReturn` HM.singleton "key" "value"
+      (retrieve om >>= await) `shouldReturnM` HM.singleton "key" "value"
       OM.insert "key2" "value2" om
-      retrieveIO om `shouldReturn` HM.fromList [("key", "value"), ("key2", "value2")]
+      (retrieve om >>= await) `shouldReturnM` HM.fromList [("key", "value"), ("key2", "value2")]
 
   describe "subscribe" $ do
-    it "calls the callback with the contents of the map" $ do
-      lastCallbackValue <- newIORef undefined
+    it "calls the callback with the contents of the map" $ io $ withRootResourceManager do
+      lastCallbackValue <- liftIO $ newIORef impossibleCodePath
 
-      om <- OM.new :: IO (OM.ObservableHashMap String String)
-      subscriptionHandle <- oldObserve om $ writeIORef lastCallbackValue
-      let lastCallbackShouldBe expected = do
+      om :: OM.ObservableHashMap String String <- OM.new
+      subscriptionHandle <- captureDisposable_ $ observe om $ liftIO . writeIORef lastCallbackValue
+      let lastCallbackShouldBe expected = liftIO do
             (ObservableUpdate update) <- readIORef lastCallbackValue
             update `shouldBe` expected
 
@@ -46,12 +53,12 @@ spec = parallel $ do
       lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")])
 
   describe "subscribeDelta" $ do
-    it "calls the callback with changes to the map" $ do
-      lastDelta <- newIORef undefined
+    it "calls the callback with changes to the map" $ io $ withRootResourceManager do
+      lastDelta <- liftIO $ newIORef impossibleCodePath
 
-      om <- OM.new :: IO (OM.ObservableHashMap String String)
+      om :: OM.ObservableHashMap String String <- OM.new
       subscriptionHandle <- subscribeDelta om $ writeIORef lastDelta
-      let lastDeltaShouldBe = (readIORef lastDelta `shouldReturn`)
+      let lastDeltaShouldBe = liftIO . (readIORef lastDelta `shouldReturn`)
 
       lastDeltaShouldBe $ Reset HM.empty
       OM.insert "key" "value" om
@@ -73,20 +80,20 @@ spec = parallel $ do
       OM.delete "key2" om
       lastDeltaShouldBe $ Delete "key2"
 
-      OM.lookupDelete "key" om `shouldReturn` Just "changed"
+      OM.lookupDelete "key" om `shouldReturnM` Just "changed"
       lastDeltaShouldBe $ Delete "key"
 
-      retrieveIO om `shouldReturn` HM.singleton "key3" "value3"
+      (retrieve om >>= await) `shouldReturnM` HM.singleton "key3" "value3"
 
   describe "observeKey" $ do
-    it "calls key callbacks with the correct value" $ do
-      value1 <- newIORef undefined
-      value2 <- newIORef undefined
+    it "calls key callbacks with the correct value" $ io $ withRootResourceManager do
+      value1 <- liftIO $ newIORef undefined
+      value2 <- liftIO $ newIORef undefined
 
-      om <- OM.new :: IO (OM.ObservableHashMap String String)
+      om :: OM.ObservableHashMap String String <- OM.new
 
-      void $ oldObserve (OM.observeKey "key1" om) (writeIORef value1)
-      let v1ShouldBe expected = do
+      void $ observe (OM.observeKey "key1" om) (liftIO . writeIORef value1)
+      let v1ShouldBe expected = liftIO do
             (ObservableUpdate update) <- readIORef value1
             update `shouldBe` expected
 
@@ -98,8 +105,8 @@ spec = parallel $ do
       OM.insert "key2" "value2" om
       v1ShouldBe $ Just "value1"
 
-      handle2 <- oldObserve (OM.observeKey "key2" om) (writeIORef value2)
-      let v2ShouldBe expected = do
+      handle2 <- captureDisposable_ $ observe (OM.observeKey "key2" om) (liftIO . writeIORef value2)
+      let v2ShouldBe expected = liftIO do
             (ObservableUpdate update) <- readIORef value2
             update `shouldBe` expected
 
@@ -119,15 +126,15 @@ spec = parallel $ do
       v1ShouldBe $ Nothing
       v2ShouldBe $ Just "changed"
 
-      retrieveIO om `shouldReturn` HM.singleton "key2" "changed"
+      (retrieve om >>= await) `shouldReturnM` HM.singleton "key2" "changed"
       dispose handle2
 
-      OM.lookupDelete "key2" om `shouldReturn` Just "changed"
+      OM.lookupDelete "key2" om `shouldReturnM` Just "changed"
       v2ShouldBe $ Just "changed"
 
-      OM.lookupDelete "key2" om `shouldReturn` Nothing
+      OM.lookupDelete "key2" om `shouldReturnM` Nothing
 
-      OM.lookupDelete "key1" om `shouldReturn` Nothing
+      OM.lookupDelete "key1" om `shouldReturnM` Nothing
       v1ShouldBe $ Nothing
 
-      retrieveIO om `shouldReturn` HM.empty
+      (retrieve om >>= await) `shouldReturnM` HM.empty
diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs
index 39bf3c0..a09c5e8 100644
--- a/test/Quasar/Observable/ObservablePrioritySpec.hs
+++ b/test/Quasar/Observable/ObservablePrioritySpec.hs
@@ -1,39 +1,46 @@
 module Quasar.Observable.ObservablePrioritySpec (spec) where
 
+import Control.Monad (void)
+import Data.IORef
+import Quasar.Awaitable
 import Quasar.Disposable
 import Quasar.Observable
 import Quasar.Observable.ObservablePriority (ObservablePriority)
 import Quasar.Observable.ObservablePriority qualified as OP
-
-import Control.Monad (void)
-import Data.IORef
-import Prelude
+import Quasar.ResourceManager
+import Quasar.Prelude
 import Test.Hspec
 
 
+shouldReturnM :: (Eq a, Show a, MonadIO m) => m a -> a -> m ()
+shouldReturnM action expected = do
+  result <- action
+  liftIO $ result `shouldBe` expected
+
+
 spec :: Spec
 spec = do
   describe "ObservablePriority" $ parallel $ do
-    it "can be created" $ do
+    it "can be created" $ io do
       void $ OP.create
-    specify "retrieveIO returns the value with the highest priority" $ do
-      (op :: ObservablePriority Int String) <- OP.create
+    specify "retrieveIO returns the value with the highest priority" $ io $ withRootResourceManager do
+      op :: ObservablePriority Int String <- OP.create
       p2 <- OP.insertValue op 2 "p2"
-      retrieveIO op `shouldReturn` Just "p2"
+      (retrieve op >>= await) `shouldReturnM` Just "p2"
       p1 <- OP.insertValue op 1 "p1"
-      retrieveIO op `shouldReturn` Just "p2"
+      (retrieve op >>= await) `shouldReturnM` Just "p2"
       dispose p2
-      retrieveIO op `shouldReturn` Just "p1"
+      (retrieve op >>= await) `shouldReturnM` Just "p1"
       dispose p1
-      retrieveIO op `shouldReturn` Nothing
-    it "sends updates when its value changes" $ do
-      result <- newIORef []
-      let mostRecentShouldBe expected = do
+      (retrieve op >>= await) `shouldReturnM` Nothing
+    it "sends updates when its value changes" $ io $ withRootResourceManager do
+      result <- liftIO $ newIORef []
+      let mostRecentShouldBe expected = liftIO do
             (ObservableUpdate x) <- (head <$> readIORef result)
             x `shouldBe` expected
 
-      (op :: ObservablePriority Int String) <- OP.create
-      _s <- oldObserve op (modifyIORef result . (:))
+      op :: ObservablePriority Int String <- OP.create
+      _s <- observe op (liftIO . modifyIORef result . (:))
       mostRecentShouldBe Nothing
       p2 <- OP.insertValue op 2 "p2"
 
@@ -45,4 +52,4 @@ spec = do
       dispose p1
       mostRecentShouldBe Nothing
 
-      length <$> readIORef result `shouldReturn` 4
+      liftIO $ length <$> readIORef result `shouldReturn` 4
diff --git a/test/Quasar/ObservableSpec.hs b/test/Quasar/ObservableSpec.hs
index f93557c..0df6d1d 100644
--- a/test/Quasar/ObservableSpec.hs
+++ b/test/Quasar/ObservableSpec.hs
@@ -2,6 +2,7 @@ module Quasar.ObservableSpec (spec) where
 
 import Data.IORef
 import Quasar.Prelude
+import Quasar.Awaitable
 import Quasar.Observable
 import Quasar.ResourceManager
 import Test.Hspec
@@ -26,29 +27,29 @@ observableSpec = parallel do
 mergeObservableSpec :: Spec
 mergeObservableSpec = do
   describe "mergeObservable" $ parallel $ do
-    it "merges correctly using retrieveIO" $ do
+    it "merges correctly using retrieveIO" $ io $ withRootResourceManager do
       a <- newObservableVar ""
       b <- newObservableVar ""
 
-      let mergedObservable = mergeObservable (,) a b
-      let latestShouldBe = (retrieveIO mergedObservable `shouldReturn`)
+      let mergedObservable = liftA2 (,) (toObservable a) (toObservable b)
+      let latestShouldBe val = retrieve mergedObservable >>= await >>= liftIO . (`shouldBe` val)
 
       testSequence a b latestShouldBe
 
-    it "merges correctly using observe" $ do
+    it "merges correctly using observe" $ io $ withRootResourceManager do
       a <- newObservableVar ""
       b <- newObservableVar ""
 
-      let mergedObservable = mergeObservable (,) a b
-      (latestRef :: IORef (ObservableMessage (String, String))) <- newIORef (ObservableUpdate ("", ""))
-      void $ oldObserve mergedObservable (writeIORef latestRef)
-      let latestShouldBe expected = do
+      let mergedObservable = liftA2 (,) (toObservable a) (toObservable b)
+      (latestRef :: IORef (ObservableMessage (String, String))) <- liftIO $ newIORef (ObservableUpdate ("", ""))
+      void $ observe mergedObservable (liftIO . writeIORef latestRef)
+      let latestShouldBe expected = liftIO do
             (ObservableUpdate x) <- readIORef latestRef
             x `shouldBe` expected
 
       testSequence a b latestShouldBe
   where
-    testSequence :: ObservableVar String -> ObservableVar String -> ((String, String) -> IO ()) -> IO ()
+    testSequence :: ObservableVar String -> ObservableVar String -> ((String, String) -> ResourceManagerIO ()) -> ResourceManagerIO ()
     testSequence a b latestShouldBe = do
       latestShouldBe ("", "")
 
-- 
GitLab