Skip to content
Snippets Groups Projects
Commit 0c8b0ec5 authored by Jens Nolte's avatar Jens Nolte
Browse files

Forward exceptions from async tasks to resource manager

parent 900afc03
No related branches found
No related tags found
No related merge requests found
...@@ -5,18 +5,19 @@ module Quasar.Async ( ...@@ -5,18 +5,19 @@ module Quasar.Async (
asyncWithUnmask, asyncWithUnmask,
asyncWithUnmask_, asyncWithUnmask_,
-- ** Task exceptions -- ** Async exceptions
CancelTask(..), CancelAsync(..),
TaskDisposed(..), AsyncDisposed(..),
AsyncException(..),
) where ) where
import Control.Monad.Catch
import Control.Monad.Reader import Control.Monad.Reader
import Quasar.Async.Unmanaged import Quasar.Async.Unmanaged
import Quasar.Awaitable import Quasar.Awaitable
import Quasar.Prelude import Quasar.Prelude
import Quasar.ResourceManager import Quasar.ResourceManager
-- | TODO: Documentation -- | TODO: Documentation
-- --
-- The action will be run with asynchronous exceptions unmasked. -- The action will be run with asynchronous exceptions unmasked.
...@@ -30,12 +31,18 @@ asyncWithUnmask :: MonadResourceManager m => ((ResourceManagerIO a -> ResourceMa ...@@ -30,12 +31,18 @@ asyncWithUnmask :: MonadResourceManager m => ((ResourceManagerIO a -> ResourceMa
asyncWithUnmask action = do asyncWithUnmask action = do
resourceManager <- askResourceManager resourceManager <- askResourceManager
toAwaitable <$> registerNewResource do toAwaitable <$> registerNewResource do
unmanagedAsyncWithUnmask (\unmask -> runReaderT (action (liftUnmask unmask)) resourceManager) coreAsyncImplementation (handler resourceManager) \unmask ->
onResourceManager resourceManager (action (liftUnmask unmask))
where where
handler :: ResourceManager -> SomeException -> IO ()
handler resourceManager ex = when (fromException ex /= Just AsyncDisposed) do
-- Throwing to the resource manager is safe because the handler runs on the async thread the resource manager
-- cannot reach disposed state until the thread exits
throwToResourceManager resourceManager ex
liftUnmask :: (forall b. IO b -> IO b) -> ResourceManagerIO a -> ResourceManagerIO a liftUnmask :: (forall b. IO b -> IO b) -> ResourceManagerIO a -> ResourceManagerIO a
liftUnmask unmask innerAction = do liftUnmask unmask innerAction = do
resourceManager <- askResourceManager resourceManager <- askResourceManager
liftIO $ unmask $ runReaderT innerAction resourceManager liftIO $ unmask $ onResourceManager resourceManager innerAction
async_ :: MonadResourceManager m => (ResourceManagerIO ()) -> m () async_ :: MonadResourceManager m => (ResourceManagerIO ()) -> m ()
async_ action = void $ async action async_ action = void $ async action
......
...@@ -7,9 +7,13 @@ module Quasar.Async.Unmanaged ( ...@@ -7,9 +7,13 @@ module Quasar.Async.Unmanaged (
unmanagedAsyncWithUnmask_, unmanagedAsyncWithUnmask_,
-- ** Task exceptions -- ** Task exceptions
CancelTask(..), CancelAsync(..),
TaskDisposed(..), AsyncDisposed(..),
)where AsyncException(..),
-- ** Implementation internals
coreAsyncImplementation
) where
import Control.Concurrent (ThreadId, forkIOWithUnmask, throwTo) import Control.Concurrent (ThreadId, forkIOWithUnmask, throwTo)
...@@ -40,7 +44,7 @@ instance IsDisposable (Task r) where ...@@ -40,7 +44,7 @@ instance IsDisposable (Task r) where
TaskStateRunning threadId -> do TaskStateRunning threadId -> do
writeTVar stateVar TaskStateThrowing writeTVar stateVar TaskStateThrowing
pure do pure do
throwTo threadId $ CancelTask key throwTo threadId $ CancelAsync key
atomically $ writeTVar stateVar TaskStateCompleted atomically $ writeTVar stateVar TaskStateCompleted
TaskStateThrowing -> pure $ pure () TaskStateThrowing -> pure $ pure ()
TaskStateCompleted -> pure $ pure () TaskStateCompleted -> pure $ pure ()
...@@ -56,25 +60,26 @@ instance Functor Task where ...@@ -56,25 +60,26 @@ instance Functor Task where
fmap fn (Task key actionVar finalizerVar resultAwaitable) = Task key actionVar finalizerVar (fn <$> resultAwaitable) fmap fn (Task key actionVar finalizerVar resultAwaitable) = Task key actionVar finalizerVar (fn <$> resultAwaitable)
data CancelTask = CancelTask Unique data CancelAsync = CancelAsync Unique
instance Show CancelTask where deriving stock Eq
show _ = "CancelTask" instance Show CancelAsync where
instance Exception CancelTask where show _ = "CancelAsync"
instance Exception CancelAsync where
data TaskDisposed = TaskDisposed data AsyncDisposed = AsyncDisposed
deriving stock Show deriving stock (Eq, Show)
instance Exception TaskDisposed where instance Exception AsyncDisposed where
-- TODO Needs a descriptive name. This is similar in functionality to `ExceptionThrownInLinkedThread`
data AsyncException = AsyncException SomeException
deriving stock Show
deriving anyclass Exception
unmanagedAsync :: MonadIO m => IO a -> m (Task a)
unmanagedAsync action = unmanagedAsyncWithUnmask \unmask -> unmask action
unmanagedAsync_ :: MonadIO m => IO () -> m Disposable -- | Base implementation for the `unmanagedAsync`- and `Quasar.Async.async`-class of functions.
unmanagedAsync_ action = toDisposable <$> unmanagedAsync action coreAsyncImplementation :: MonadIO m => (SomeException -> IO ()) -> ((forall b. IO b -> IO b) -> IO a) -> m (Task a)
coreAsyncImplementation handler action = do
unmanagedAsyncWithUnmask :: MonadIO m => ((forall b. IO b -> IO b) -> IO a) -> m (Task a)
unmanagedAsyncWithUnmask action = do
liftIO $ mask_ do liftIO $ mask_ do
key <- newUnique key <- newUnique
resultVar <- newAsyncVar resultVar <- newAsyncVar
...@@ -83,18 +88,19 @@ unmanagedAsyncWithUnmask action = do ...@@ -83,18 +88,19 @@ unmanagedAsyncWithUnmask action = do
threadId <- forkIOWithUnmask \unmask -> threadId <- forkIOWithUnmask \unmask ->
handleAll handleAll
do \ex -> fail $ "unmanagedAsyncWithUnmask thread failed: " <> displayException ex do \ex -> fail $ "coreAsyncImplementation thread failed: " <> displayException ex
do do
result <- try $ handleIf result <- try $ catchAll
do \(CancelTask exKey) -> key == exKey do action unmask
do \_ -> throwIO TaskDisposed \ex -> do
do -- Rewrite exception if its the cancel exception for this async
action unmask when (fromException ex == Just (CancelAsync key)) $ throwIO AsyncDisposed
throwIO $ AsyncException ex
-- The `action` has completed its work. -- The `action` has completed its work.
-- "disarm" dispose: -- "disarm" dispose:
handleIf handleIf
do \(CancelTask exKey) -> key == exKey do \(CancelAsync exKey) -> key == exKey
do mempty -- ignore exception if it matches; this can only happen once do mempty -- ignore exception if it matches; this can only happen once
do do
atomically $ readTVar stateVar >>= \case atomically $ readTVar stateVar >>= \case
...@@ -103,14 +109,29 @@ unmanagedAsyncWithUnmask action = do ...@@ -103,14 +109,29 @@ unmanagedAsyncWithUnmask action = do
TaskStateThrowing -> retry -- Could not disarm so we have to wait for the exception to arrive TaskStateThrowing -> retry -- Could not disarm so we have to wait for the exception to arrive
TaskStateCompleted -> pure () TaskStateCompleted -> pure ()
catchAll
case result of
Left ex -> when (fromException ex /= Just AsyncDisposed) $ handler ex
Right _ -> pure ()
\ex -> undefined
atomically do atomically do
putAsyncVarEitherSTM_ resultVar result putAsyncVarEitherSTM_ resultVar result
defaultRunFinalizers finalizers defaultRunFinalizers finalizers
atomically $ writeTVar stateVar $ TaskStateRunning threadId atomically $ writeTVar stateVar $ TaskStateRunning threadId
pure $ Task key stateVar finalizers (toAwaitable resultVar) pure $ Task key stateVar finalizers (toAwaitable resultVar)
unmanagedAsync :: MonadIO m => IO a -> m (Task a)
unmanagedAsync action = unmanagedAsyncWithUnmask \unmask -> unmask action
unmanagedAsync_ :: MonadIO m => IO () -> m Disposable
unmanagedAsync_ action = toDisposable <$> unmanagedAsync action
unmanagedAsyncWithUnmask :: MonadIO m => ((forall b. IO b -> IO b) -> IO a) -> m (Task a)
unmanagedAsyncWithUnmask = coreAsyncImplementation (traceIO . ("Unhandled exception in unmanaged async: " <>) . displayException)
unmanagedAsyncWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable unmanagedAsyncWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable
unmanagedAsyncWithUnmask_ action = toDisposable <$> unmanagedAsyncWithUnmask action unmanagedAsyncWithUnmask_ action = toDisposable <$> unmanagedAsyncWithUnmask action
...@@ -204,7 +204,7 @@ newtype Delay = Delay (Task ()) ...@@ -204,7 +204,7 @@ newtype Delay = Delay (Task ())
deriving newtype IsDisposable deriving newtype IsDisposable
instance IsAwaitable () Delay where instance IsAwaitable () Delay where
toAwaitable (Delay task) = toAwaitable task `catch` \TaskDisposed -> throwM TimerCancelled toAwaitable (Delay task) = toAwaitable task `catch` \AsyncDisposed -> throwM TimerCancelled
newDelay :: MonadResourceManager m => Int -> m Delay newDelay :: MonadResourceManager m => Int -> m Delay
newDelay microseconds = registerNewResource $ newUnmanagedDelay microseconds newDelay microseconds = registerNewResource $ newUnmanagedDelay microseconds
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment