Skip to content
Snippets Groups Projects
Unmanaged.hs 5.19 KiB
Newer Older
module Quasar.Async.Unmanaged (
  -- ** Unmanaged variant
  unmanagedAsync,
  unmanagedAsync_,
  unmanagedAsyncWithUnmask,
  unmanagedAsyncWithUnmask_,

  -- ** Task exceptions
  CancelAsync(..),
  AsyncDisposed(..),
  AsyncException(..),

  -- ** Implementation internals
  coreAsyncImplementation
) where


import Control.Concurrent (ThreadId, forkIOWithUnmask, throwTo)
import Control.Concurrent.STM
import Control.Monad.Catch
import Quasar.Awaitable
import Quasar.Disposable
import Quasar.Prelude


-- | A task is an operation (e.g. a thread or a network request) that is running asynchronously and can be cancelled.
-- It has a result and can fail.
--
-- The result (or exception) can be aquired by using the `IsAwaitable` class (e.g. by calling `await` or `awaitIO`).
-- It is possible to cancel the task by using `dispose` or `cancelTask` if the operation has not been completed.
data Task r = Task Unique (TVar TaskState) DisposableFinalizers (Awaitable r)

data TaskState = TaskStateInitializing | TaskStateRunning ThreadId | TaskStateThrowing | TaskStateCompleted

instance IsAwaitable r (Task r) where
  toAwaitable (Task _ _ _ resultAwaitable) = resultAwaitable

instance IsDisposable (Task r) where
  beginDispose self@(Task key stateVar _ _) = uninterruptibleMask_ do
    join $ atomically do
      readTVar stateVar >>= \case
Jens Nolte's avatar
Jens Nolte committed
        TaskStateInitializing -> unreachableCodePathM
        TaskStateRunning threadId -> do
          writeTVar stateVar TaskStateThrowing
          pure do
            throwTo threadId $ CancelAsync key
            atomically $ writeTVar stateVar TaskStateCompleted
        TaskStateThrowing -> pure $ pure ()
        TaskStateCompleted -> pure $ pure ()

    -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang.
    pure $ DisposeResultAwait $ isDisposed self

  isDisposed (Task _ _ _ resultAwaitable) = (() <$ resultAwaitable) `catchAll` \_ -> pure ()

  registerFinalizer (Task _ _ finalizers _) = defaultRegisterFinalizer finalizers

instance Functor Task where
  fmap fn (Task key actionVar finalizerVar resultAwaitable) = Task key actionVar finalizerVar (fn <$> resultAwaitable)


data CancelAsync = CancelAsync Unique
  deriving stock Eq
instance Show CancelAsync where
  show _ = "CancelAsync"
instance Exception CancelAsync where
data AsyncDisposed = AsyncDisposed
  deriving stock (Eq, Show)
instance Exception AsyncDisposed where
-- TODO Needs a descriptive name. This is similar in functionality to `ExceptionThrownInLinkedThread`
data AsyncException = AsyncException SomeException
  deriving stock Show
  deriving anyclass Exception
-- | Base implementation for the `unmanagedAsync`- and `Quasar.Async.async`-class of functions.
coreAsyncImplementation :: MonadIO m => (SomeException -> IO ()) -> ((forall b. IO b -> IO b) -> IO a) -> m (Task a)
coreAsyncImplementation handler action = do
    key <- newUnique
    stateVar <- newTVarIO TaskStateInitializing
    finalizers <- newDisposableFinalizers

    threadId <- forkIOWithUnmask \unmask ->
      handleAll
        do \ex -> fail $ "coreAsyncImplementation thread failed: " <> displayException ex
          result <- try $ catchAll
            do action unmask
            \ex -> do
              -- Rewrite exception if its the cancel exception for this async
              when (fromException ex == Just (CancelAsync key)) $ throwIO AsyncDisposed
              throwIO $ AsyncException ex

          -- The `action` has completed its work.
          -- "disarm" dispose:
            do \(CancelAsync exKey) -> key == exKey
            do
              atomically $ readTVar stateVar >>= \case
                TaskStateInitializing -> retry
                TaskStateRunning _ -> writeTVar stateVar TaskStateCompleted
                TaskStateThrowing -> retry -- Could not disarm so we have to wait for the exception to arrive
                TaskStateCompleted -> pure ()
            do mempty -- ignore exception if it matches; this can only happen once (see TaskStateThrowing above)
              Left (fromException -> Just AsyncDisposed) -> pure ()
              Left ex -> handler ex
              _ -> pure ()
            \ex -> traceIO $ "An exception was thrown while handling an async exception: " <> displayException ex
          atomically do
            putAsyncVarEitherSTM_ resultVar result
            defaultRunFinalizers finalizers

    atomically $ writeTVar stateVar $ TaskStateRunning threadId

    pure $ Task key stateVar finalizers (toAwaitable resultVar)

unmanagedAsync :: MonadIO m => IO a -> m (Task a)
unmanagedAsync action = unmanagedAsyncWithUnmask \unmask -> unmask action

unmanagedAsync_ :: MonadIO m => IO () -> m Disposable
unmanagedAsync_ action = toDisposable <$> unmanagedAsync action

unmanagedAsyncWithUnmask :: MonadIO m => ((forall b. IO b -> IO b) -> IO a) -> m (Task a)
unmanagedAsyncWithUnmask = coreAsyncImplementation (traceIO . ("Unhandled exception in unmanaged async: " <>) . displayException)

unmanagedAsyncWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable
unmanagedAsyncWithUnmask_ action = toDisposable <$> unmanagedAsyncWithUnmask action