Skip to content
Snippets Groups Projects
Commit 11e293c8 authored by Jens Nolte's avatar Jens Nolte
Browse files

Add error handling to AsyncIO


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent c2d927c9
No related branches found
No related tags found
No related merge requests found
Pipeline #2316 passed
......@@ -51,6 +51,7 @@ common shared-properties
-Wno-missing-import-lists
-Wno-unsafe
-Wno-all-missed-specialisations
-Werror=missing-methods
common shared-executable-properties
import: shared-properties
......
......@@ -2,6 +2,9 @@ module Quasar.Core (
-- * Async
IsAsync(..),
Async,
successfulAsync,
failedAsync,
completedAsync,
-- * AsyncIO
AsyncIO,
......@@ -25,6 +28,7 @@ module Quasar.Core (
noDisposable,
) where
import Control.Exception (try)
import Data.HashMap.Strict qualified as HM
import Quasar.Prelude
......@@ -33,16 +37,18 @@ import Quasar.Prelude
class IsAsync r a | a -> r where
-- | Wait until the promise is settled and return the result.
wait :: a -> IO r
wait promise = do
wait x = do
mvar <- newEmptyMVar
onResult_ promise (resultCallback mvar)
readMVar mvar
onResult_ x (resultCallback mvar)
readMVar mvar >>= either throwIO pure
where
resultCallback :: MVar r -> r -> IO ()
resultCallback :: MVar (Either SomeException r) -> Either SomeException r -> IO ()
resultCallback mvar result = do
success <- tryPutMVar mvar result
unless success $ fail "Callback was called multiple times"
peekAsync :: a -> IO (Maybe (Either SomeException r))
-- | Register a callback, that will be called once the promise is settled.
-- If the promise is already settled, the callback will be called immediately instead.
--
......@@ -50,11 +56,11 @@ class IsAsync r a | a -> r where
onResult
:: a
-- ^ async
-> (r -> IO ())
-> (Either SomeException r -> IO ())
-- ^ callback
-> IO Disposable
onResult_ :: a -> (r -> IO ()) -> IO ()
onResult_ :: a -> (Either SomeException r -> IO ()) -> IO ()
onResult_ x = void . onResult x
toAsync :: a -> Async r
......@@ -68,58 +74,24 @@ instance IsAsync r (Async r) where
wait (SomeAsync x) = wait x
onResult (SomeAsync x) = onResult x
onResult_ (SomeAsync x) = onResult_ x
peekAsync (SomeAsync x) = peekAsync x
toAsync = id
--instance Functor Async where
-- fmap fn = toAsync . MappedAsync fn
--
--instance Applicative Async where
-- pure = toAsync . CompletedAsync
-- (<*>) pf px = pf >>= \f -> f <$> px
-- liftA2 f px py = px >>= \x -> f x <$> py
--
--instance Monad Async where
-- x >>= y = toAsync $ BindAsync x y
--
--
--instance Semigroup r => Semigroup (Async r) where
-- (<>) = liftA2 (<>)
--
--instance Monoid r => Monoid (Async r) where
-- mempty = pure mempty
-- mconcat = fmap mconcat . sequence
completedAsync :: x -> Async x
completedAsync = toAsync . CompletedAsync
newtype CompletedAsync r = CompletedAsync r
newtype CompletedAsync r = CompletedAsync (Either SomeException r)
instance IsAsync r (CompletedAsync r) where
wait (CompletedAsync value) = pure value
wait (CompletedAsync value) = either throwIO pure value
onResult (CompletedAsync value) callback = noDisposable <$ callback value
peekAsync (CompletedAsync value) = pure $ Just value
data MappedAsync r = forall a. MappedAsync (a -> r) (Async a)
instance IsAsync r (MappedAsync r) where
onResult (MappedAsync fn x) callback = onResult x $ callback . fn
onResult_ (MappedAsync fn x) callback = onResult_ x $ callback . fn
data BindAsync r = forall a. BindAsync (Async a) (a -> Async r)
instance IsAsync r (BindAsync r) where
onResult (BindAsync px fn) callback = do
(disposableMVar :: MVar (Maybe Disposable)) <- newEmptyMVar
d1 <- onResult px $ \x ->
modifyMVar_ disposableMVar $ \case
-- Already disposed
Nothing -> pure Nothing
Just _ -> do
d2 <- onResult (fn x) callback
pure $ Just d2
putMVar disposableMVar $ Just d1
pure $ mkDisposable $ do
currentDisposable <- liftIO $ readMVar disposableMVar
dispose currentDisposable
onResult_ (BindAsync px fn) callback = onResult_ px $ \x -> onResult_ (fn x) callback
completedAsync :: Either SomeException r -> Async r
completedAsync = toAsync . CompletedAsync
successfulAsync :: r -> Async r
successfulAsync = completedAsync . Right
failedAsync :: SomeException -> Async r
failedAsync = completedAsync . Left
-- * AsyncIO
......@@ -130,23 +102,22 @@ instance Functor AsyncIO where
fmap f = (pure . f =<<)
instance Applicative AsyncIO where
pure = AsyncIO . pure . completedAsync
liftA2 f px py = do
ax <- async px
y <- py
x <- await ax
await $ completedAsync (f x y)
pure = await . successfulAsync
(<*>) pf px = pf >>= \f -> f <$> px
liftA2 f px py = px >>= \x -> f x <$> py
instance Monad AsyncIO where
lhs >>= fn = AsyncIO $ do
resultVar <- newAsyncVar
lhsAsync <- startAsyncIO lhs
lhsAsync `onResult_` \lhsResult -> do
rhsAsync <- startAsyncIO $ fn lhsResult
rhsAsync `onResult_` putAsyncVar resultVar
lhsAsync `onResult_` \case
Right lhsResult -> do
rhsAsync <- startAsyncIO $ fn lhsResult
rhsAsync `onResult_` putAsyncVarEither resultVar
Left lhsEx -> putAsyncVarEither resultVar (Left lhsEx)
pure $ toAsync resultVar
instance MonadIO AsyncIO where
liftIO = AsyncIO . fmap completedAsync
liftIO = AsyncIO . fmap completedAsync . try
-- | Run the synchronous part of an `AsyncIO` and then return an `Async` that can be used to wait for completion of the synchronous part.
......@@ -158,7 +129,7 @@ await = AsyncIO . pure . toAsync
-- | Run an `AsyncIO` to completion and return the result.
runAsyncIO :: AsyncIO r -> IO r
runAsyncIO = wait <=< startAsyncIO
runAsyncIO = startAsyncIO >=> wait
-- | Run the synchronous part of an `AsyncIO`. Returns an `Async` that can be used to wait for completion of the operation.
......@@ -167,6 +138,7 @@ startAsyncIO (AsyncIO x) = x
-- ** Forking asyncs
-- TODO
--class IsAsyncForkable m where
-- asyncThread :: m r -> AsyncIO r
......@@ -176,73 +148,45 @@ startAsyncIO (AsyncIO x) = x
-- ** AsyncVar
-- | The default implementation for a `Async` that can be fulfilled later.
data AsyncVar r = AsyncVar (MVar r) (MVar (Maybe (HM.HashMap Unique (r -> IO ()))))
newtype AsyncVar r = AsyncVar (MVar (AsyncVarState r))
data AsyncVarState r = AsyncVarCompleted (Either SomeException r) | AsyncVarOpen (HM.HashMap Unique (Either SomeException r -> IO ()))
instance IsAsync r (AsyncVar r) where
wait :: AsyncVar r -> IO r
wait (AsyncVar valueMVar _) = readMVar valueMVar
onResult :: AsyncVar r -> (r -> IO ()) -> IO Disposable
onResult (AsyncVar valueMVar callbackMVar) callback =
modifyMVar callbackMVar $ \case
Just callbacks -> do
peekAsync :: AsyncVar r -> IO (Maybe (Either SomeException r))
peekAsync (AsyncVar mvar) = readMVar mvar >>= pure . \case
AsyncVarCompleted x -> Just x
AsyncVarOpen _ -> Nothing
onResult :: AsyncVar r -> (Either SomeException r -> IO ()) -> IO Disposable
onResult (AsyncVar mvar) callback =
modifyMVar mvar $ \case
AsyncVarOpen callbacks -> do
key <- newUnique
pure (Just (HM.insert key callback callbacks), removeHandler key)
Nothing -> (Nothing, noDisposable) <$ (callback =<< readMVar valueMVar)
pure (AsyncVarOpen (HM.insert key callback callbacks), removeHandler key)
x@(AsyncVarCompleted value) -> (x, noDisposable) <$ callback value
where
removeHandler :: Unique -> Disposable
removeHandler key = synchronousDisposable $ modifyMVar_ callbackMVar $ pure . fmap (HM.delete key)
removeHandler key = synchronousDisposable $ modifyMVar_ mvar $ pure . \case
x@(AsyncVarCompleted _) -> x
AsyncVarOpen x -> AsyncVarOpen $ HM.delete key x
newAsyncVar :: MonadIO m => m (AsyncVar r)
newAsyncVar = liftIO $ AsyncVar <$> newEmptyMVar <*> newMVar (Just HM.empty)
newAsyncVar = liftIO $ AsyncVar <$> newMVar (AsyncVarOpen HM.empty)
putAsyncVar :: MonadIO m => AsyncVar a -> a -> m ()
putAsyncVar (AsyncVar valueMVar callbackMVar) value = liftIO $ do
success <- tryPutMVar valueMVar value
unless success $ fail "An AsyncVar can only be fulfilled once"
callbacks <- modifyMVar callbackMVar (pure . (Nothing, ) . concatMap HM.elems)
mapM_ ($ value) callbacks
-- ** Async cache
--data CachedAsyncState r = CacheNoCallbacks | CacheHasCallbacks Disposable (HM.HashMap Unique (r -> IO ())) | CacheSettled r
--data CachedAsync r = CachedAsync (Async r) (MVar (CachedAsyncState r))
--
--instance IsAsync r (CachedAsync r) where
-- onResult (CachedAsync baseAsync stateMVar) callback =
-- modifyMVar stateMVar $ \case
-- CacheNoCallbacks -> do
-- key <- newUnique
-- disp <- onResult baseAsync baseAsyncResultCallback
-- pure (CacheHasCallbacks disp (HM.singleton key callback), removeHandler key)
-- CacheHasCallbacks disp callbacks -> do
-- key <- newUnique
-- pure (CacheHasCallbacks disp (HM.insert key callback callbacks), removeHandler key)
-- x@(CacheSettled value) -> (x, noDisposable) <$ callback value
-- where
-- removeHandler :: Unique -> Disposable
-- removeHandler key = mkDisposable $ do
-- state <- liftIO $ takeMVar stateMVar
-- newState <- case state of
-- CacheHasCallbacks disp callbacks -> do
-- let newCallbacks = HM.delete key callbacks
-- if HM.null newCallbacks
-- then CacheNoCallbacks <$ dispose disp
-- else pure (CacheHasCallbacks disp newCallbacks)
-- x -> pure x
-- liftIO $ putMVar stateMVar newState
-- baseAsyncResultCallback :: r -> IO ()
-- baseAsyncResultCallback value = do
-- -- FIXME race condition: mvar is blocked by caller when baseAsync runs synchronous
-- callbacks <- modifyMVar stateMVar $ \case
-- CacheHasCallbacks _ callbacks -> pure (CacheSettled value, HM.elems callbacks)
-- CacheNoCallbacks -> pure (CacheSettled value, [])
-- CacheSettled _ -> fail "Callback was called multiple times"
-- mapM_ ($ value) callbacks
--
--newCachedAsync :: (IsAsync r p, MonadIO m) => p -> m (Async r)
--newCachedAsync x = liftIO $ toAsync . CachedAsync (toAsync x) <$> newMVar CacheNoCallbacks
putAsyncVar asyncVar = putAsyncVarEither asyncVar . Right
putAsyncVarEither :: MonadIO m => AsyncVar a -> Either SomeException a -> m ()
putAsyncVarEither (AsyncVar mvar) value = liftIO $ do
modifyMVar_ mvar $ \case
AsyncVarCompleted _ -> fail "An AsyncVar can only be fulfilled once"
AsyncVarOpen callbacksMap -> do
let callbacks = HM.elems callbacksMap
-- NOTE disposing a callback while it is called is a deadlock
mapM_ ($ value) callbacks
pure (AsyncVarCompleted value)
-- * Disposable
......
......@@ -3,10 +3,14 @@ module Quasar.AsyncSpec (spec) where
import Control.Applicative (liftA2)
import Control.Concurrent
import Control.Monad.IO.Class
import Data.Either (isRight)
import Prelude
import Test.Hspec
import Quasar.Core
shouldSatisfyM :: (HasCallStack, Show a) => IO a -> (a -> Bool) -> Expectation
shouldSatisfyM action expected = action >>= (`shouldSatisfy` expected)
spec :: Spec
spec = parallel $ do
describe "AsyncVar" $ do
......@@ -24,10 +28,10 @@ spec = parallel $ do
mvar <- newEmptyMVar
avar `onResult_` putMVar mvar
tryTakeMVar mvar `shouldReturn` Nothing
(() <$) <$> tryTakeMVar mvar `shouldReturn` Nothing
putAsyncVar avar ()
tryTakeMVar mvar `shouldReturn` Just ()
tryTakeMVar mvar `shouldSatisfyM` maybe False isRight
describe "AsyncIO" $ do
it "binds pure operations" $ do
......@@ -43,19 +47,11 @@ spec = parallel $ do
it "can continue after awaiting an already finished operation" $ do
runAsyncIO (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42
it "can continue after awaiting an async that itself finishes afterwards" $ do
avar <- newAsyncVar
runAsyncIO $ await avar *> putAsyncVar avar ()
it "liftA2" $ do
avar <- newAsyncVar
runAsyncIO (liftA2 (,) (await avar) (putAsyncVar avar 42)) `shouldReturn` (42 :: Int, ())
it "can continue after blocking on an async that is completed from another thread" $ do
a1 <- newAsyncVar
a2 <- newAsyncVar
a3 <- newAsyncVar
a4 <- newAsyncVar
_ <- forkIO $ runAsyncIO $ await a1 >>= putAsyncVar a2 >> await a3 >>= putAsyncVar a4
runAsyncIO ((await a2 >> (await a4 *> putAsyncVar a3 1)) *> putAsyncVar a1 41)
liftA2 (+) (wait a2) (wait a4) `shouldReturn` (42 :: Int)
--it "can continue after blocking on an async that is completed from another thread" $ do
-- a1 <- newAsyncVar
-- a2 <- newAsyncVar
-- a3 <- newAsyncVar
-- a4 <- newAsyncVar
-- _ <- forkIO $ runAsyncIO $ await a1 >>= putAsyncVar a2 >> await a3 >>= putAsyncVar a4
-- runAsyncIO ((await a2 >> (await a4 *> putAsyncVar a3 1)) *> putAsyncVar a1 41)
-- liftA2 (+) (wait a2) (wait a4) `shouldReturn` (42 :: Int)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment