Newer
Older
module Quasar.Async.Unmanaged (
-- ** Unmanaged variant
unmanagedAsync,
unmanagedAsync_,
unmanagedAsyncWithUnmask,
unmanagedAsyncWithUnmask_,
CancelAsync(..),
AsyncDisposed(..),
AsyncException(..),
-- ** Implementation internals
coreAsyncImplementation
) where
import Control.Concurrent (ThreadId, forkIOWithUnmask, throwTo)
import Control.Concurrent.STM
import Control.Monad.Catch
import Quasar.Awaitable
import Quasar.Disposable
import Quasar.Prelude
-- | A task is an operation (e.g. a thread or a network request) that is running asynchronously and can be cancelled.
-- It has a result and can fail.
--
-- The result (or exception) can be aquired by using the `IsAwaitable` class (e.g. by calling `await` or `awaitIO`).
-- It is possible to cancel the task by using `dispose` or `cancelTask` if the operation has not been completed.
data Task r = Task Unique (TVar TaskState) DisposableFinalizers (Awaitable r)
data TaskState = TaskStateInitializing | TaskStateRunning ThreadId | TaskStateThrowing | TaskStateCompleted
instance IsAwaitable r (Task r) where
toAwaitable (Task _ _ _ resultAwaitable) = resultAwaitable
instance IsDisposable (Task r) where
beginDispose self@(Task key stateVar _ _) = uninterruptibleMask_ do
join $ atomically do
readTVar stateVar >>= \case
TaskStateRunning threadId -> do
writeTVar stateVar TaskStateThrowing
pure do
throwTo threadId $ CancelAsync key
atomically $ writeTVar stateVar TaskStateCompleted
TaskStateThrowing -> pure $ pure ()
TaskStateCompleted -> pure $ pure ()
-- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang.
pure $ DisposeResultAwait $ isDisposed self
isDisposed (Task _ _ _ resultAwaitable) = (() <$ resultAwaitable) `catchAll` \_ -> pure ()
registerFinalizer (Task _ _ finalizers _) = defaultRegisterFinalizer finalizers
instance Functor Task where
fmap fn (Task key actionVar finalizerVar resultAwaitable) = Task key actionVar finalizerVar (fn <$> resultAwaitable)
data CancelAsync = CancelAsync Unique
deriving stock Eq
instance Show CancelAsync where
show _ = "CancelAsync"
instance Exception CancelAsync where
data AsyncDisposed = AsyncDisposed
deriving stock (Eq, Show)
instance Exception AsyncDisposed where
-- TODO Needs a descriptive name. This is similar in functionality to `ExceptionThrownInLinkedThread`
data AsyncException = AsyncException SomeException
deriving stock Show
deriving anyclass Exception
-- | Base implementation for the `unmanagedAsync`- and `Quasar.Async.async`-class of functions.
coreAsyncImplementation :: MonadIO m => (SomeException -> IO ()) -> ((forall b. IO b -> IO b) -> IO a) -> m (Task a)
coreAsyncImplementation handler action = do
liftIO $ mask_ do
resultVar <- newAsyncVar
stateVar <- newTVarIO TaskStateInitializing
finalizers <- newDisposableFinalizers
threadId <- forkIOWithUnmask \unmask ->
handleAll
do \ex -> fail $ "coreAsyncImplementation thread failed: " <> displayException ex
result <- try $ catchAll
do action unmask
\ex -> do
-- Rewrite exception if its the cancel exception for this async
when (fromException ex == Just (CancelAsync key)) $ throwIO AsyncDisposed
throwIO $ AsyncException ex
-- The `action` has completed its work.
-- "disarm" dispose:
do \(CancelAsync exKey) -> key == exKey
do
atomically $ readTVar stateVar >>= \case
TaskStateInitializing -> retry
TaskStateRunning _ -> writeTVar stateVar TaskStateCompleted
TaskStateThrowing -> retry -- Could not disarm so we have to wait for the exception to arrive
TaskStateCompleted -> pure ()
do mempty -- ignore exception if it matches; this can only happen once (see TaskStateThrowing above)
catchAll
case result of
Left (fromException -> Just AsyncDisposed) -> pure ()
Left ex -> handler ex
_ -> pure ()
\ex -> traceIO $ "An exception was thrown while handling an async exception: " <> displayException ex
atomically do
putAsyncVarEitherSTM_ resultVar result
defaultRunFinalizers finalizers
atomically $ writeTVar stateVar $ TaskStateRunning threadId
pure $ Task key stateVar finalizers (toAwaitable resultVar)
unmanagedAsync :: MonadIO m => IO a -> m (Task a)
unmanagedAsync action = unmanagedAsyncWithUnmask \unmask -> unmask action
unmanagedAsync_ :: MonadIO m => IO () -> m Disposable
unmanagedAsync_ action = toDisposable <$> unmanagedAsync action
unmanagedAsyncWithUnmask :: MonadIO m => ((forall b. IO b -> IO b) -> IO a) -> m (Task a)
unmanagedAsyncWithUnmask = coreAsyncImplementation (traceIO . ("Unhandled exception in unmanaged async: " <>) . displayException)
unmanagedAsyncWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable
unmanagedAsyncWithUnmask_ action = toDisposable <$> unmanagedAsyncWithUnmask action