diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index a529e8a28d69e65e8071aea7b5f7c99c0cbdd1b9..f119cd6097e4e88a59a2a592e1058b128987b106 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -1,4 +1,5 @@
 {-# LANGUAGE UndecidableInstances #-}
+{-# LANGUAGE ViewPatterns #-}
 
 module Quasar.Observable (
   -- * Observable core types
@@ -28,8 +29,10 @@ module Quasar.Observable (
   ObservableCallback,
 ) where
 
+import Control.Applicative
 import Control.Concurrent.MVar
 import Control.Concurrent.STM
+import Control.Monad.Catch
 import Control.Monad.Except
 import Control.Monad.Reader
 import Control.Monad.Trans.Maybe
@@ -45,14 +48,14 @@ import System.IO (fixIO)
 
 data ObservableMessage a
   = ObservableUpdate a
-  | ObservableConnecting
+  | ObservableLoading
   | ObservableReconnecting SomeException
   | ObservableNotAvailable SomeException
   deriving stock (Show, Generic)
 
 instance Functor ObservableMessage where
   fmap fn (ObservableUpdate x) = ObservableUpdate (fn x)
-  fmap _ ObservableConnecting = ObservableConnecting
+  fmap _ ObservableLoading = ObservableLoading
   fmap _ (ObservableReconnecting ex) = ObservableReconnecting ex
   fmap _ (ObservableNotAvailable ex) = ObservableNotAvailable ex
 
@@ -62,8 +65,8 @@ instance Applicative ObservableMessage where
   liftA2 _ _ (ObservableNotAvailable ex) = ObservableNotAvailable ex
   liftA2 _ (ObservableReconnecting ex) _ = ObservableReconnecting ex
   liftA2 _ _ (ObservableReconnecting ex) = ObservableReconnecting ex
-  liftA2 _ ObservableConnecting _ = ObservableConnecting
-  liftA2 _ _ ObservableConnecting = ObservableConnecting
+  liftA2 _ ObservableLoading _ = ObservableLoading
+  liftA2 _ _ ObservableLoading = ObservableLoading
   liftA2 fn (ObservableUpdate x) (ObservableUpdate y) = ObservableUpdate (fn x y)
 
 
@@ -113,11 +116,30 @@ instance IsObservable v (Observable v) where
 
 instance Functor Observable where
   fmap f = mapObservable f
+
 instance Applicative Observable where
-  pure = constObservable
-  liftA2 = mergeObservable
+  pure = toObservable . ConstObservable
+  liftA2 fn x y = toObservable $ MergedObservable fn x y
+
 instance Monad Observable where
-  (>>=) = bindObservable
+  x >>= y = toObservable $ BindObservable x y
+
+instance MonadThrow Observable where
+  throwM :: forall e v. Exception e => e -> Observable v
+  throwM = toObservable . FailedObservable @v . toException
+
+instance MonadCatch Observable where
+  catch action handler = toObservable $ CatchObservable action handler
+
+instance MonadFail Observable where
+  fail = throwM . userError
+
+instance Alternative Observable where
+  empty = fail "empty"
+  x <|> y = x `catchAll` const y
+
+instance MonadPlus Observable
+
 
 
 data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> b) o
@@ -128,6 +150,146 @@ instance IsObservable v (MappedObservable v) where
   mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 . f2) upstream
 
 
+
+data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable r)
+
+instance IsRetrievable r (BindObservable r) where
+  retrieve (BindObservable fx fn) = async $ do
+    x <- awaitResult $ retrieve fx
+    awaitResult $ retrieve $ fn x
+
+instance IsObservable r (BindObservable r) where
+  observe :: forall r. (BindObservable r) -> (ObservableMessage r -> IO ()) -> IO Disposable
+  observe (BindObservable fx fn) callback = do
+    -- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
+    resourceManager <- newResourceManager unlimitedResourceManagerConfiguration
+
+    isDisposingVar <- newTVarIO False
+    disposableVar <- newTMVarIO noDisposable
+    keyVar <- newTMVarIO Nothing
+
+    leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar)
+
+    attachDisposeAction_ resourceManager $ do
+      atomically $ writeTVar isDisposingVar True
+      d1 <- dispose leftDisposable
+      -- Block while the `outerCallback` is running
+      d2 <- dispose =<< atomically (takeTMVar disposableVar)
+      pure (d1 <> d2)
+
+    pure $ toDisposable resourceManager
+    where
+      outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do
+        key <- newUnique
+
+        join $ atomically $ do
+          readTVar isDisposingVar >>= \case
+            False -> do
+              -- Blocks while an inner callback is running
+              void $ swapTMVar keyVar (Just key)
+
+              oldDisposable <- takeTMVar disposableVar
+
+              -- IO action that will run after the STM transaction
+              pure $ do
+                disposeEventually resourceManager oldDisposable
+
+                newDisposable <-
+                  unmask (outerMessageHandler key observableMessage)
+                    `onException`
+                      atomically (putTMVar disposableVar noDisposable)
+
+                atomically $ putTMVar disposableVar newDisposable
+
+            -- When already disposing no new handlers should be registered
+            True -> pure $ pure ()
+
+        where
+          outerMessageHandler key (ObservableUpdate x) = observe (fn x) (innerCallback key)
+          outerMessageHandler key (ObservableLoading) = noDisposable <$ callback ObservableLoading
+          outerMessageHandler key (ObservableReconnecting ex) = noDisposable <$ callback (ObservableReconnecting ex)
+          outerMessageHandler key (ObservableNotAvailable ex) = noDisposable <$ callback (ObservableNotAvailable ex)
+
+          innerCallback :: Unique -> ObservableMessage r -> IO ()
+          innerCallback key x = do
+            bracket
+              -- Take key var to prevent parallel callbacks
+              (atomically $ takeTMVar keyVar)
+              -- Put key back
+              (atomically . putTMVar keyVar)
+              -- Call callback when key is still valid
+              (\currentKey -> when (Just key == currentKey) $ callback x)
+
+
+
+data CatchObservable e r = Exception e => CatchObservable (Observable r) (e -> Observable r)
+
+instance IsRetrievable r (CatchObservable e r) where
+  retrieve (CatchObservable fx fn) = async $
+    awaitResult (retrieve fx) `catch` \ex -> awaitResult (retrieve (fn ex))
+
+instance IsObservable r (CatchObservable e r) where
+  observe :: forall e r. (CatchObservable e r) -> (ObservableMessage r -> IO ()) -> IO Disposable
+  observe (CatchObservable fx fn) callback = do
+    -- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
+    resourceManager <- newResourceManager unlimitedResourceManagerConfiguration
+
+    isDisposingVar <- newTVarIO False
+    disposableVar <- newTMVarIO noDisposable
+    keyVar <- newTMVarIO Nothing
+
+    leftDisposable <- observe fx (outerCallback resourceManager isDisposingVar disposableVar keyVar)
+
+    attachDisposeAction_ resourceManager $ do
+      atomically $ writeTVar isDisposingVar True
+      d1 <- dispose leftDisposable
+      -- Block while the `outerCallback` is running
+      d2 <- dispose =<< atomically (takeTMVar disposableVar)
+      pure (d1 <> d2)
+
+    pure $ toDisposable resourceManager
+    where
+      outerCallback resourceManager isDisposingVar disposableVar keyVar observableMessage = mask $ \unmask -> do
+        key <- newUnique
+
+        join $ atomically $ do
+          readTVar isDisposingVar >>= \case
+            False -> do
+              -- Blocks while an inner callback is running
+              void $ swapTMVar keyVar (Just key)
+
+              oldDisposable <- takeTMVar disposableVar
+
+              -- IO action that will run after the STM transaction
+              pure $ do
+                disposeEventually resourceManager oldDisposable
+
+                newDisposable <-
+                  unmask (outerMessageHandler key observableMessage)
+                    `onException`
+                      atomically (putTMVar disposableVar noDisposable)
+
+                atomically $ putTMVar disposableVar newDisposable
+
+            -- When already disposing no new handlers should be registered
+            True -> pure $ pure ()
+
+        where
+          outerMessageHandler key msg@(ObservableNotAvailable (fromException -> Just ex)) = observe (fn ex) (innerCallback key)
+          outerMessageHandler key msg = noDisposable <$ callback msg
+
+          innerCallback :: Unique -> ObservableMessage r -> IO ()
+          innerCallback key x = do
+            bracket
+              -- Take key var to prevent parallel callbacks
+              (atomically $ takeTMVar keyVar)
+              -- Put key back
+              (atomically . putTMVar keyVar)
+              -- Call callback when key is still valid
+              (\currentKey -> when (Just key == currentKey) $ callback x)
+
+
+
 newtype ObservableVar v = ObservableVar (MVar (v, HM.HashMap Unique (ObservableCallback v)))
 instance IsRetrievable v (ObservableVar v) where
   retrieve (ObservableVar mvar) = liftIO $ successfulTask . fst <$> readMVar mvar
@@ -173,68 +335,16 @@ withObservableVar (ObservableVar mvar) f = withMVar mvar (f . fst)
 
 
 bindObservable :: (IsObservable a ma, IsObservable b mb) => ma -> (a -> mb) -> Observable b
-bindObservable x fy = joinObservable $ mapObservable fy x
-
-
--- | Internal state of `JoinedObservable`
-data JoinedObservableState
-  = JoinedObservableInactive
-  | JoinedObservableActive Unique Disposable
-  | JoinedObservableDisposed
-
-instance IsDisposable JoinedObservableState where
-  dispose JoinedObservableInactive = pure $ successfulAwaitable ()
-  dispose (JoinedObservableActive _ disposable) = dispose disposable
-  dispose JoinedObservableDisposed = pure $ successfulAwaitable ()
-
-newtype JoinedObservable o = JoinedObservable o
-instance forall v o i. (IsRetrievable i o, IsRetrievable v i) => IsRetrievable v (JoinedObservable o) where
-  retrieve :: HasResourceManager m => JoinedObservable o -> m (Task v)
-  retrieve (JoinedObservable outer) = async $ await =<< retrieve =<< await =<< retrieve outer
-instance forall v o i. (IsObservable i o, IsObservable v i) => IsObservable v (JoinedObservable o) where
-  observe :: JoinedObservable o -> (ObservableMessage v -> IO ()) -> IO Disposable
-  observe (JoinedObservable outer) callback = do
-    -- Create a resource manager to ensure all subscriptions are cleaned up when disposing.
-    resourceManager <- newResourceManager unlimitedResourceManagerConfiguration
-
-    stateMVar <- newMVar JoinedObservableInactive
-    outerDisposable <- observe outer (outerCallback resourceManager stateMVar)
-
-    attachDisposeAction_ resourceManager $ do
-      d1 <- dispose outerDisposable
-      d2 <- modifyMVar stateMVar $ \state -> do
-        d2temp <- dispose state
-        pure (JoinedObservableDisposed, d2temp)
-      pure $ d1 <> d2
-
-    pure $ toDisposable resourceManager
-      where
-        outerCallback :: ResourceManager -> MVar JoinedObservableState -> ObservableMessage i -> IO ()
-        outerCallback resourceManager stateMVar message = do
-          oldState <- takeMVar stateMVar
-          disposeEventually resourceManager oldState
-          key <- newUnique
-          newState <- outerCallbackObserve key message
-          putMVar stateMVar newState
-          where
-            outerCallbackObserve :: Unique -> ObservableMessage i -> IO JoinedObservableState
-            outerCallbackObserve key (ObservableUpdate innerObservable) = JoinedObservableActive key <$> observe innerObservable (filteredCallback key)
-            outerCallbackObserve _ ObservableConnecting = JoinedObservableInactive <$ callback ObservableConnecting
-            outerCallbackObserve _ (ObservableReconnecting ex) = JoinedObservableInactive <$ callback (ObservableReconnecting ex)
-            outerCallbackObserve _ (ObservableNotAvailable ex) = JoinedObservableInactive <$ callback (ObservableNotAvailable ex)
-            filteredCallback :: Unique -> ObservableMessage v -> IO ()
-            filteredCallback key msg =
-              -- TODO write a version that does not deadlock when `observe` calls the callback directly
-              undefined
-              withMVar stateMVar $ \case
-                JoinedObservableActive activeKey _ -> callback msg
-                _ -> pure ()
+bindObservable fx fn = (toObservable fx) >>= \x -> toObservable (fn x)
 
 joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v
-joinObservable = Observable . JoinedObservable
-
+joinObservable = join . fmap toObservable . toObservable
 
 
+-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
+-- observable updates according to the merge function.
+--
+-- There is no caching involed, every subscriber effectively subscribes to both input observables.
 data MergedObservable r o0 v0 o1 v1 = MergedObservable (v0 -> v1 -> r) o0 o1
 instance forall r o0 v0 o1 v1. (IsRetrievable v0 o0, IsRetrievable v1 o1) => IsRetrievable r (MergedObservable r o0 v0 o1 v1) where
   retrieve (MergedObservable merge obs0 obs1) = liftA2 (liftA2 merge) (retrieve obs0) (retrieve obs1)
@@ -256,7 +366,10 @@ instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObs
         mapM_ callback mMerged
 
 
--- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting observable updates according to the merge function.
+-- | Merge two observables using a given merge function. Whenever one of the inputs is updated, the resulting
+-- observable updates according to the merge function.
+--
+-- Behaves like `liftA2` on `Observable` but accepts anything that implements `IsObservable`..
 --
 -- There is no caching involed, every subscriber effectively subscribes to both input observables.
 mergeObservable :: (IsObservable v0 o0, IsObservable v1 o1) => (v0 -> v1 -> r) -> o0 -> o1 -> Observable r
@@ -296,14 +409,19 @@ synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn
 newtype ConstObservable v = ConstObservable v
 instance IsRetrievable v (ConstObservable v) where
   retrieve (ConstObservable x) = pure $ pure x
-instance IsObservable a (ConstObservable a) where
+instance IsObservable v (ConstObservable v) where
   observe (ConstObservable x) callback = do
     callback $ ObservableUpdate x
     pure noDisposable
 
--- | Create an observable that contains a constant value.
-constObservable :: v -> Observable v
-constObservable = Observable . ConstObservable
+
+newtype FailedObservable v = FailedObservable SomeException
+instance IsRetrievable v (FailedObservable v) where
+  retrieve (FailedObservable ex) = liftIO $ throwIO ex
+instance IsObservable v (FailedObservable v) where
+  observe (FailedObservable ex) callback = do
+    callback $ ObservableNotAvailable ex
+    pure noDisposable
 
 
 -- | Create an observable by simply running an IO action whenever a value is requested or a callback is registered.