diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs
index a92b890f6e80e7cd0c51f7fa10836d4203ec0a12..a8375166ef2c4451bb33dea0a5593ff620e87161 100644
--- a/src/Quasar/Core.hs
+++ b/src/Quasar/Core.hs
@@ -24,7 +24,6 @@ module Quasar.Core (
   -- * Disposable
   IsDisposable(..),
   disposeIO,
-  disposeEventually,
   Disposable,
   mkDisposable,
   synchronousDisposable,
@@ -42,7 +41,7 @@ class IsAsync r a | a -> r where
   wait :: a -> IO r
   wait x = do
     mvar <- newEmptyMVar
-    onResult_ (void . tryPutMVar mvar . Left) x (resultCallback mvar)
+    onResult_ x (void . tryPutMVar mvar . Left) (resultCallback mvar)
     readMVar mvar >>= either throwIO pure
     where
       resultCallback :: MVar (Either SomeException r) -> Either SomeException r -> IO ()
@@ -57,17 +56,17 @@ class IsAsync r a | a -> r where
   --
   -- The returned `Disposable` can be used to deregister the callback.
   onResult
-    :: (SomeException -> IO ())
-    -- ^ callback exception handler
-    -> a
+    :: a
     -- ^ async
+    -> (SomeException -> IO ())
+    -- ^ callback exception handler
     -> (Either SomeException r -> IO ())
     -- ^ callback
     -> IO Disposable
 
   onResult_
-    :: (SomeException -> IO ())
-    -> a
+    :: a
+    -> (SomeException -> IO ())
     -> (Either SomeException r -> IO ())
     -> IO ()
   onResult_ x y = void . onResult x y
@@ -80,16 +79,16 @@ data Async r = forall a. IsAsync r a => SomeAsync a
 
 instance IsAsync r (Async r) where
   wait (SomeAsync x) = wait x
-  onResult y (SomeAsync x) = onResult y x
-  onResult_ y (SomeAsync x) = onResult_ y x
+  onResult (SomeAsync x) y = onResult x y
+  onResult_ (SomeAsync x) y = onResult_ x y
   peekAsync (SomeAsync x) = peekAsync x
-  toAsync = id
+
 
 
 newtype CompletedAsync r = CompletedAsync (Either SomeException r)
 instance IsAsync r (CompletedAsync r) where
   wait (CompletedAsync value) = either throwIO pure value
-  onResult callbackExceptionHandler (CompletedAsync value) callback = noDisposable <$ (callback value `catch` callbackExceptionHandler)
+  onResult (CompletedAsync value) callbackExceptionHandler callback = noDisposable <$ (callback value `catch` callbackExceptionHandler)
   peekAsync (CompletedAsync value) = pure $ Just value
 
 completedAsync :: Either SomeException r -> Async r
@@ -104,45 +103,88 @@ failedAsync = completedAsync . Left
 
 -- * AsyncIO
 
-newtype AsyncIO r = AsyncIO (IO (Async r))
+data AsyncIO r
+  = AsyncIOSuccess r
+  | AsyncIOFailure SomeException
+  | AsyncIOIO (IO r)
+  | AsyncIOAsync (Async r)
+  | AsyncIOPlumbing (IO (AsyncIO r))
 
 instance Functor AsyncIO where
-  fmap f = (pure . f =<<)
+  fmap fn (AsyncIOSuccess x) = AsyncIOSuccess (fn x)
+  fmap _ (AsyncIOFailure x) = AsyncIOFailure x
+  fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x)
+  fmap fn (AsyncIOAsync x) = bindAsync x (pure . fn)
+  fmap fn (AsyncIOPlumbing x) = AsyncIOPlumbing (fn <<$>> x)
 
 instance Applicative AsyncIO where
-  pure = await . successfulAsync
+  pure = AsyncIOSuccess
   (<*>) pf px = pf >>= \f -> f <$> px
   liftA2 f px py = px >>= \x -> f x <$> py
 instance Monad AsyncIO where
   (>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b
-  lhs >>= fn = AsyncIO $ newAsyncVar >>= go
-    where
-      go resultVar = do
-        lhsAsync <- async lhs
-        lhsAsync `onResultBound` \case
-          Right lhsResult ->  do
-            rhsAsync <- async $ fn lhsResult
-            rhsAsync `onResultBound` putAsyncVarEither resultVar
-          Left lhsEx -> putAsyncVarEither resultVar (Left lhsEx)
-        pure $ toAsync resultVar
-        where
-          onResultBound :: forall r. Async r -> (Either SomeException r -> IO ()) -> IO ()
-          onResultBound = onResult_ (putAsyncVarEither resultVar . Left)
+  (>>=) (AsyncIOSuccess x) fn = fn x
+  (>>=) (AsyncIOFailure x) _ = AsyncIOFailure x
+  (>>=) (AsyncIOIO x) fn = AsyncIOPlumbing $ either AsyncIOFailure fn <$> try x
+  (>>=) (AsyncIOAsync x) fn = bindAsync x fn
+  (>>=) (AsyncIOPlumbing x) fn = AsyncIOPlumbing $ (>>= fn) <$> x
 
 instance MonadIO AsyncIO where
-  liftIO = AsyncIO . fmap successfulAsync
+  liftIO = AsyncIOIO
+
+instance MonadThrow AsyncIO where
+  throwM = AsyncIOFailure . toException
+
+instance MonadCatch AsyncIO where
+  catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a
+  catch x@(AsyncIOSuccess _) _ = x
+  catch x@(AsyncIOFailure ex) handler = maybe x handler (fromException ex)
+  catch (AsyncIOIO x) handler = AsyncIOIO (try x) >>= handleEither handler
+  catch (AsyncIOAsync x) handler = bindAsyncCatch x (handleEither handler)
+  catch (AsyncIOPlumbing x) handler = AsyncIOPlumbing $ x >>= pure . (`catch` handler)
+
+handleEither :: Exception e => (e -> AsyncIO a) -> Either SomeException a -> AsyncIO a
+handleEither handler (Left ex) = maybe (AsyncIOFailure ex) handler (fromException ex)
+handleEither _ (Right r) = pure r
+
+bindAsync :: forall a b. Async a -> (a -> AsyncIO b) -> AsyncIO b
+bindAsync x fn = bindAsyncCatch x (either (AsyncIOFailure) fn)
+
+bindAsyncCatch :: forall a b. Async a -> (Either SomeException a -> AsyncIO b) -> AsyncIO b
+bindAsyncCatch x fn = AsyncIOPlumbing $ newAsyncVar >>= bindAsync'
+  where
+    bindAsync' resultVar = do
+      withResult x resultVar step
+      pure $ await resultVar
+    step :: (Either SomeException b -> IO ()) -> Either SomeException a -> IO ()
+    step put = putAsyncIOResult put . fn
+
+withResult :: Async a -> AsyncVar b -> ((Either SomeException b -> IO ()) -> Either SomeException a -> IO ()) -> IO ()
+withResult x var fn = onResult_ x (failAsyncVar var) (fn (putAsyncVarEither var))
+
+putAsyncIOResult :: (Either SomeException a -> IO ()) -> AsyncIO a -> IO ()
+putAsyncIOResult put (AsyncIOSuccess x) = put (Right x)
+putAsyncIOResult put (AsyncIOFailure x) = put (Left x)
+putAsyncIOResult put (AsyncIOIO x) = try x >>= put
+putAsyncIOResult put (AsyncIOAsync x) = onResult_ x (put . Left) put
+putAsyncIOResult put (AsyncIOPlumbing x) = x >>= putAsyncIOResult put
+
 
 
 -- | Run the synchronous part of an `AsyncIO` and then return an `Async` that can be used to wait for completion of the synchronous part.
-async :: MonadIO m => AsyncIO r -> m (Async r)
-async (AsyncIO x) = liftIO x
+async :: AsyncIO r -> AsyncIO (Async r)
+async = fmap successfulAsync
 
 await :: IsAsync r a => a -> AsyncIO r
-await = AsyncIO . pure . toAsync
+await = AsyncIOAsync . toAsync
 
 -- | Run an `AsyncIO` to completion and return the result.
 runAsyncIO :: AsyncIO r -> IO r
-runAsyncIO = async >=> wait
+runAsyncIO (AsyncIOSuccess x) = pure x
+runAsyncIO (AsyncIOFailure x) = throwIO x
+runAsyncIO (AsyncIOIO x) = x
+runAsyncIO (AsyncIOAsync x) = wait x
+runAsyncIO (AsyncIOPlumbing x) = x >>= runAsyncIO -- TODO error handling
 
 awaitResult :: AsyncIO (Async r) -> AsyncIO r
 awaitResult = (await =<<)
@@ -154,7 +196,6 @@ mapAsync :: (a -> b) -> Async a -> AsyncIO (Async b)
 mapAsync fn = async . fmap fn . await
 
 
-
 -- ** Forking asyncs
 
 -- TODO
@@ -176,8 +217,8 @@ instance IsAsync r (AsyncVar r) where
     AsyncVarCompleted x -> Just x
     AsyncVarOpen _ -> Nothing
 
-  onResult :: (SomeException -> IO ()) -> AsyncVar r -> (Either SomeException r -> IO ()) -> IO Disposable
-  onResult callbackExceptionHandler (AsyncVar mvar) callback =
+  onResult :: AsyncVar r -> (SomeException -> IO ()) -> (Either SomeException r -> IO ()) -> IO Disposable
+  onResult (AsyncVar mvar) callbackExceptionHandler callback =
     modifyMVar mvar $ \case
       AsyncVarOpen callbacks -> do
         key <- newUnique
@@ -194,7 +235,10 @@ newAsyncVar :: MonadIO m => m (AsyncVar r)
 newAsyncVar = liftIO $ AsyncVar <$> newMVar (AsyncVarOpen HM.empty)
 
 putAsyncVar :: MonadIO m => AsyncVar a -> a -> m ()
-putAsyncVar asyncVar = putAsyncVarEither asyncVar . Right
+putAsyncVar var = putAsyncVarEither var . Right
+
+failAsyncVar :: MonadIO m => AsyncVar a -> SomeException -> m ()
+failAsyncVar var = putAsyncVarEither var . Left
 
 putAsyncVarEither :: MonadIO m => AsyncVar a -> Either SomeException a -> m ()
 putAsyncVarEither (AsyncVar mvar) value = liftIO $ do
@@ -226,10 +270,6 @@ class IsDisposable a where
 disposeIO :: IsDisposable a => a -> IO ()
 disposeIO = runAsyncIO . dispose
 
--- | Dispose a resource. Returns without waiting for the resource to be released if possible.
-disposeEventually :: IsDisposable a => a -> IO ()
-disposeEventually = void . async . dispose
-
 instance IsDisposable a => IsDisposable (Maybe a) where
   dispose = mapM_ dispose
 
diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs
index 9f7cd4299e457a0ae4aa4af3f229a1e14524253b..e77f8cfa9e2acd765383a1c82efd140cb9256e55 100644
--- a/src/Quasar/Observable.hs
+++ b/src/Quasar/Observable.hs
@@ -187,11 +187,9 @@ instance forall o i v. (IsObservable i o, IsObservable v i) => IsObservable v (J
         outerCallback :: MVar Disposable -> ObservableMessage i -> IO ()
         outerCallback innerDisposableMVar (_reason, innerObservable) = do
           oldInnerSubscription <- takeMVar innerDisposableMVar
-          void $ async $ do
-            dispose oldInnerSubscription
-            liftIO $ do
-              newInnerSubscription <- subscribe innerObservable callback
-              putMVar innerDisposableMVar newInnerSubscription
+          disposeIO oldInnerSubscription
+          newInnerSubscription <- subscribe innerObservable callback
+          putMVar innerDisposableMVar newInnerSubscription
 
 joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v
 joinObservable = Observable . JoinedObservable
diff --git a/test/Quasar/AsyncSpec.hs b/test/Quasar/AsyncSpec.hs
index 8c5a1f71b2a7a426ebad775be58347cf3fa0decd..15bad2b20fc5ffdc24826c6d8ca565e10b5693a2 100644
--- a/test/Quasar/AsyncSpec.hs
+++ b/test/Quasar/AsyncSpec.hs
@@ -2,6 +2,7 @@ module Quasar.AsyncSpec (spec) where
 
 import Control.Concurrent
 import Control.Exception (throwIO)
+import Control.Monad (void)
 import Control.Monad.IO.Class
 import Data.Either (isRight)
 import Prelude
@@ -26,7 +27,7 @@ spec = parallel $ do
       avar <- newAsyncVar :: IO (AsyncVar ())
 
       mvar <- newEmptyMVar
-      onResult_ throwIO avar (putMVar mvar)
+      onResult_ avar throwIO (putMVar mvar)
 
       (() <$) <$> tryTakeMVar mvar `shouldReturn` Nothing
 
@@ -46,3 +47,27 @@ spec = parallel $ do
 
     it "can continue after awaiting an already finished operation" $ do
       runAsyncIO (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42
+
+    it "can fmap the result of an already finished async" $ do
+      avar <- newAsyncVar :: IO (AsyncVar ())
+      putAsyncVar avar ()
+      runAsyncIO (id <$> await avar)
+
+    it "can fmap the result of an async that is completed later" $ do
+      avar <- newAsyncVar :: IO (AsyncVar ())
+      void $ forkIO $ do
+        threadDelay 100000
+        putAsyncVar avar ()
+      runAsyncIO (id <$> await avar)
+
+    it "can bind the result of an already finished async" $ do
+      avar <- newAsyncVar :: IO (AsyncVar ())
+      putAsyncVar avar ()
+      runAsyncIO (await avar >>= pure)
+
+    it "can bind the result of an async that is completed later" $ do
+      avar <- newAsyncVar :: IO (AsyncVar ())
+      void $ forkIO $ do
+        threadDelay 100000
+        putAsyncVar avar ()
+      runAsyncIO (await avar >>= pure)