module Quasar.Async ( Async, async, async_, asyncWithUnmask, asyncWithUnmask_, -- ** Async exceptions CancelAsync(..), AsyncDisposed(..), AsyncException(..), isCancelAsync, isAsyncDisposed, -- ** IO variant async', asyncWithUnmask', ) where import Control.Concurrent (ThreadId) import Control.Concurrent.STM import Control.Monad.Catch import Quasar.Async.Fork import Quasar.Awaitable import Quasar.Exceptions import Quasar.MonadQuasar import Quasar.Prelude import Quasar.Resources import Quasar.Resources.Disposer import Quasar.Utils.ShortIO import Control.Monad.Reader data Async a = Async (Awaitable a) Disposer instance Resource (Async a) where getDisposer (Async _ disposer) = disposer instance IsAwaitable a (Async a) where toAwaitable (Async awaitable _) = awaitable async :: MonadQuasar m => QuasarIO a -> m (Async a) async fn = asyncWithUnmask ($ fn) async_ :: MonadQuasar m => QuasarIO () -> m () async_ fn = void $ asyncWithUnmask ($ fn) asyncWithUnmask :: MonadQuasar m => ((forall b. QuasarIO b -> QuasarIO b) -> QuasarIO a) -> m (Async a) asyncWithUnmask fn = do quasar <- askQuasar asyncWithUnmask' (\unmask -> runReaderT (fn (liftUnmask unmask)) quasar) where liftUnmask :: (forall b. IO b -> IO b) -> QuasarIO a -> QuasarIO a liftUnmask unmask innerAction = do quasar <- askQuasar liftIO $ unmask $ runReaderT innerAction quasar asyncWithUnmask_ :: MonadQuasar m => ((forall b. QuasarIO b -> QuasarIO b) -> QuasarIO ()) -> m () asyncWithUnmask_ fn = void $ asyncWithUnmask fn async' :: MonadQuasar m => IO a -> m (Async a) async' fn = asyncWithUnmask' ($ fn) asyncWithUnmask' :: forall a m. MonadQuasar m => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) asyncWithUnmask' fn = maskIfRequired do worker <- askIOWorker exChan <- askExceptionSink (key, resultVar, threadIdVar, disposer) <- ensureSTM do key <- newUniqueSTM resultVar <- newAsyncVarSTM threadIdVar <- newAsyncVarSTM -- Disposer is created first to ensure the resource can be safely attached disposer <- newUnmanagedPrimitiveDisposer (disposeFn key resultVar (toAwaitable threadIdVar)) worker exChan pure (key, resultVar, threadIdVar, disposer) registerResource disposer startShortIO_ do threadId <- forkWithUnmaskShortIO (runAndPut exChan key resultVar disposer) exChan putAsyncVarShortIO_ threadIdVar threadId pure $ Async (toAwaitable resultVar) disposer where runAndPut :: ExceptionSink -> Unique -> AsyncVar a -> Disposer -> (forall b. IO b -> IO b) -> IO () runAndPut exChan key resultVar disposer unmask = do -- Called in masked state by `forkWithUnmask` result <- try $ fn unmask case result of Left (fromException -> Just (CancelAsync ((== key) -> True))) -> failAsyncVar_ resultVar AsyncDisposed Left ex -> do atomically (throwToExceptionSink exChan ex) `finally` do failAsyncVar_ resultVar (AsyncException ex) atomically $ disposeEventuallySTM_ disposer Right retVal -> do putAsyncVar_ resultVar retVal atomically $ disposeEventuallySTM_ disposer disposeFn :: Unique -> AsyncVar a -> Awaitable ThreadId -> ShortIO (Awaitable ()) disposeFn key resultVar threadIdAwaitable = do -- Should not block or fail (unless the TIOWorker is broken) threadId <- unsafeShortIO $ await threadIdAwaitable throwToShortIO threadId (CancelAsync key) -- Considered complete once a result (i.e. success or failure) has been stored pure (awaitSuccessOrFailure resultVar)