From fe58ebb7ffd4095d288818707685d35bd60159c7 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Fri, 23 Jul 2021 17:59:20 +0200 Subject: [PATCH] Refactor AsyncIO and implement exception handling Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Core.hs | 120 ++++++++++++++++++++++++++------------- src/Quasar/Observable.hs | 8 +-- test/Quasar/AsyncSpec.hs | 27 ++++++++- 3 files changed, 109 insertions(+), 46 deletions(-) diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index a92b890..a837516 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -24,7 +24,6 @@ module Quasar.Core ( -- * Disposable IsDisposable(..), disposeIO, - disposeEventually, Disposable, mkDisposable, synchronousDisposable, @@ -42,7 +41,7 @@ class IsAsync r a | a -> r where wait :: a -> IO r wait x = do mvar <- newEmptyMVar - onResult_ (void . tryPutMVar mvar . Left) x (resultCallback mvar) + onResult_ x (void . tryPutMVar mvar . Left) (resultCallback mvar) readMVar mvar >>= either throwIO pure where resultCallback :: MVar (Either SomeException r) -> Either SomeException r -> IO () @@ -57,17 +56,17 @@ class IsAsync r a | a -> r where -- -- The returned `Disposable` can be used to deregister the callback. onResult - :: (SomeException -> IO ()) - -- ^ callback exception handler - -> a + :: a -- ^ async + -> (SomeException -> IO ()) + -- ^ callback exception handler -> (Either SomeException r -> IO ()) -- ^ callback -> IO Disposable onResult_ - :: (SomeException -> IO ()) - -> a + :: a + -> (SomeException -> IO ()) -> (Either SomeException r -> IO ()) -> IO () onResult_ x y = void . onResult x y @@ -80,16 +79,16 @@ data Async r = forall a. IsAsync r a => SomeAsync a instance IsAsync r (Async r) where wait (SomeAsync x) = wait x - onResult y (SomeAsync x) = onResult y x - onResult_ y (SomeAsync x) = onResult_ y x + onResult (SomeAsync x) y = onResult x y + onResult_ (SomeAsync x) y = onResult_ x y peekAsync (SomeAsync x) = peekAsync x - toAsync = id + newtype CompletedAsync r = CompletedAsync (Either SomeException r) instance IsAsync r (CompletedAsync r) where wait (CompletedAsync value) = either throwIO pure value - onResult callbackExceptionHandler (CompletedAsync value) callback = noDisposable <$ (callback value `catch` callbackExceptionHandler) + onResult (CompletedAsync value) callbackExceptionHandler callback = noDisposable <$ (callback value `catch` callbackExceptionHandler) peekAsync (CompletedAsync value) = pure $ Just value completedAsync :: Either SomeException r -> Async r @@ -104,45 +103,88 @@ failedAsync = completedAsync . Left -- * AsyncIO -newtype AsyncIO r = AsyncIO (IO (Async r)) +data AsyncIO r + = AsyncIOSuccess r + | AsyncIOFailure SomeException + | AsyncIOIO (IO r) + | AsyncIOAsync (Async r) + | AsyncIOPlumbing (IO (AsyncIO r)) instance Functor AsyncIO where - fmap f = (pure . f =<<) + fmap fn (AsyncIOSuccess x) = AsyncIOSuccess (fn x) + fmap _ (AsyncIOFailure x) = AsyncIOFailure x + fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x) + fmap fn (AsyncIOAsync x) = bindAsync x (pure . fn) + fmap fn (AsyncIOPlumbing x) = AsyncIOPlumbing (fn <<$>> x) instance Applicative AsyncIO where - pure = await . successfulAsync + pure = AsyncIOSuccess (<*>) pf px = pf >>= \f -> f <$> px liftA2 f px py = px >>= \x -> f x <$> py instance Monad AsyncIO where (>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b - lhs >>= fn = AsyncIO $ newAsyncVar >>= go - where - go resultVar = do - lhsAsync <- async lhs - lhsAsync `onResultBound` \case - Right lhsResult -> do - rhsAsync <- async $ fn lhsResult - rhsAsync `onResultBound` putAsyncVarEither resultVar - Left lhsEx -> putAsyncVarEither resultVar (Left lhsEx) - pure $ toAsync resultVar - where - onResultBound :: forall r. Async r -> (Either SomeException r -> IO ()) -> IO () - onResultBound = onResult_ (putAsyncVarEither resultVar . Left) + (>>=) (AsyncIOSuccess x) fn = fn x + (>>=) (AsyncIOFailure x) _ = AsyncIOFailure x + (>>=) (AsyncIOIO x) fn = AsyncIOPlumbing $ either AsyncIOFailure fn <$> try x + (>>=) (AsyncIOAsync x) fn = bindAsync x fn + (>>=) (AsyncIOPlumbing x) fn = AsyncIOPlumbing $ (>>= fn) <$> x instance MonadIO AsyncIO where - liftIO = AsyncIO . fmap successfulAsync + liftIO = AsyncIOIO + +instance MonadThrow AsyncIO where + throwM = AsyncIOFailure . toException + +instance MonadCatch AsyncIO where + catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a + catch x@(AsyncIOSuccess _) _ = x + catch x@(AsyncIOFailure ex) handler = maybe x handler (fromException ex) + catch (AsyncIOIO x) handler = AsyncIOIO (try x) >>= handleEither handler + catch (AsyncIOAsync x) handler = bindAsyncCatch x (handleEither handler) + catch (AsyncIOPlumbing x) handler = AsyncIOPlumbing $ x >>= pure . (`catch` handler) + +handleEither :: Exception e => (e -> AsyncIO a) -> Either SomeException a -> AsyncIO a +handleEither handler (Left ex) = maybe (AsyncIOFailure ex) handler (fromException ex) +handleEither _ (Right r) = pure r + +bindAsync :: forall a b. Async a -> (a -> AsyncIO b) -> AsyncIO b +bindAsync x fn = bindAsyncCatch x (either (AsyncIOFailure) fn) + +bindAsyncCatch :: forall a b. Async a -> (Either SomeException a -> AsyncIO b) -> AsyncIO b +bindAsyncCatch x fn = AsyncIOPlumbing $ newAsyncVar >>= bindAsync' + where + bindAsync' resultVar = do + withResult x resultVar step + pure $ await resultVar + step :: (Either SomeException b -> IO ()) -> Either SomeException a -> IO () + step put = putAsyncIOResult put . fn + +withResult :: Async a -> AsyncVar b -> ((Either SomeException b -> IO ()) -> Either SomeException a -> IO ()) -> IO () +withResult x var fn = onResult_ x (failAsyncVar var) (fn (putAsyncVarEither var)) + +putAsyncIOResult :: (Either SomeException a -> IO ()) -> AsyncIO a -> IO () +putAsyncIOResult put (AsyncIOSuccess x) = put (Right x) +putAsyncIOResult put (AsyncIOFailure x) = put (Left x) +putAsyncIOResult put (AsyncIOIO x) = try x >>= put +putAsyncIOResult put (AsyncIOAsync x) = onResult_ x (put . Left) put +putAsyncIOResult put (AsyncIOPlumbing x) = x >>= putAsyncIOResult put + -- | Run the synchronous part of an `AsyncIO` and then return an `Async` that can be used to wait for completion of the synchronous part. -async :: MonadIO m => AsyncIO r -> m (Async r) -async (AsyncIO x) = liftIO x +async :: AsyncIO r -> AsyncIO (Async r) +async = fmap successfulAsync await :: IsAsync r a => a -> AsyncIO r -await = AsyncIO . pure . toAsync +await = AsyncIOAsync . toAsync -- | Run an `AsyncIO` to completion and return the result. runAsyncIO :: AsyncIO r -> IO r -runAsyncIO = async >=> wait +runAsyncIO (AsyncIOSuccess x) = pure x +runAsyncIO (AsyncIOFailure x) = throwIO x +runAsyncIO (AsyncIOIO x) = x +runAsyncIO (AsyncIOAsync x) = wait x +runAsyncIO (AsyncIOPlumbing x) = x >>= runAsyncIO -- TODO error handling awaitResult :: AsyncIO (Async r) -> AsyncIO r awaitResult = (await =<<) @@ -154,7 +196,6 @@ mapAsync :: (a -> b) -> Async a -> AsyncIO (Async b) mapAsync fn = async . fmap fn . await - -- ** Forking asyncs -- TODO @@ -176,8 +217,8 @@ instance IsAsync r (AsyncVar r) where AsyncVarCompleted x -> Just x AsyncVarOpen _ -> Nothing - onResult :: (SomeException -> IO ()) -> AsyncVar r -> (Either SomeException r -> IO ()) -> IO Disposable - onResult callbackExceptionHandler (AsyncVar mvar) callback = + onResult :: AsyncVar r -> (SomeException -> IO ()) -> (Either SomeException r -> IO ()) -> IO Disposable + onResult (AsyncVar mvar) callbackExceptionHandler callback = modifyMVar mvar $ \case AsyncVarOpen callbacks -> do key <- newUnique @@ -194,7 +235,10 @@ newAsyncVar :: MonadIO m => m (AsyncVar r) newAsyncVar = liftIO $ AsyncVar <$> newMVar (AsyncVarOpen HM.empty) putAsyncVar :: MonadIO m => AsyncVar a -> a -> m () -putAsyncVar asyncVar = putAsyncVarEither asyncVar . Right +putAsyncVar var = putAsyncVarEither var . Right + +failAsyncVar :: MonadIO m => AsyncVar a -> SomeException -> m () +failAsyncVar var = putAsyncVarEither var . Left putAsyncVarEither :: MonadIO m => AsyncVar a -> Either SomeException a -> m () putAsyncVarEither (AsyncVar mvar) value = liftIO $ do @@ -226,10 +270,6 @@ class IsDisposable a where disposeIO :: IsDisposable a => a -> IO () disposeIO = runAsyncIO . dispose --- | Dispose a resource. Returns without waiting for the resource to be released if possible. -disposeEventually :: IsDisposable a => a -> IO () -disposeEventually = void . async . dispose - instance IsDisposable a => IsDisposable (Maybe a) where dispose = mapM_ dispose diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 9f7cd42..e77f8cf 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -187,11 +187,9 @@ instance forall o i v. (IsObservable i o, IsObservable v i) => IsObservable v (J outerCallback :: MVar Disposable -> ObservableMessage i -> IO () outerCallback innerDisposableMVar (_reason, innerObservable) = do oldInnerSubscription <- takeMVar innerDisposableMVar - void $ async $ do - dispose oldInnerSubscription - liftIO $ do - newInnerSubscription <- subscribe innerObservable callback - putMVar innerDisposableMVar newInnerSubscription + disposeIO oldInnerSubscription + newInnerSubscription <- subscribe innerObservable callback + putMVar innerDisposableMVar newInnerSubscription joinObservable :: (IsObservable i o, IsObservable v i) => o -> Observable v joinObservable = Observable . JoinedObservable diff --git a/test/Quasar/AsyncSpec.hs b/test/Quasar/AsyncSpec.hs index 8c5a1f7..15bad2b 100644 --- a/test/Quasar/AsyncSpec.hs +++ b/test/Quasar/AsyncSpec.hs @@ -2,6 +2,7 @@ module Quasar.AsyncSpec (spec) where import Control.Concurrent import Control.Exception (throwIO) +import Control.Monad (void) import Control.Monad.IO.Class import Data.Either (isRight) import Prelude @@ -26,7 +27,7 @@ spec = parallel $ do avar <- newAsyncVar :: IO (AsyncVar ()) mvar <- newEmptyMVar - onResult_ throwIO avar (putMVar mvar) + onResult_ avar throwIO (putMVar mvar) (() <$) <$> tryTakeMVar mvar `shouldReturn` Nothing @@ -46,3 +47,27 @@ spec = parallel $ do it "can continue after awaiting an already finished operation" $ do runAsyncIO (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42 + + it "can fmap the result of an already finished async" $ do + avar <- newAsyncVar :: IO (AsyncVar ()) + putAsyncVar avar () + runAsyncIO (id <$> await avar) + + it "can fmap the result of an async that is completed later" $ do + avar <- newAsyncVar :: IO (AsyncVar ()) + void $ forkIO $ do + threadDelay 100000 + putAsyncVar avar () + runAsyncIO (id <$> await avar) + + it "can bind the result of an already finished async" $ do + avar <- newAsyncVar :: IO (AsyncVar ()) + putAsyncVar avar () + runAsyncIO (await avar >>= pure) + + it "can bind the result of an async that is completed later" $ do + avar <- newAsyncVar :: IO (AsyncVar ()) + void $ forkIO $ do + threadDelay 100000 + putAsyncVar avar () + runAsyncIO (await avar >>= pure) -- GitLab