From a0df9d46f4920b29abf2ee18d3a67f6b3ef4758f Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 3 Apr 2022 02:04:51 +0200 Subject: [PATCH] Add unmanaged async variants Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Async.hs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index ebffedf..3a51b99 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -1,5 +1,5 @@ module Quasar.Async ( - Async, + Async(..), async, async_, asyncWithUnmask, @@ -15,11 +15,16 @@ module Quasar.Async ( -- ** IO variant async', asyncWithUnmask', + + -- ** Unmanaged variants + unmanagedAsync, + unmanagedAsyncWithUnmask, ) where import Control.Concurrent (ThreadId) import Control.Monad.Catch import Quasar.Async.Fork +import Quasar.Async.STMHelper import Quasar.Future import Quasar.Exceptions import Quasar.MonadQuasar @@ -63,17 +68,31 @@ async' fn = asyncWithUnmask' ($ fn) asyncWithUnmask' :: forall a m. (MonadQuasar m, MonadIO m) => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) asyncWithUnmask' fn = liftQuasarIO do - exChan <- askExceptionSink + worker <- askIOWorker + exSink <- askExceptionSink + spawnAsync registerResourceIO worker exSink fn + + +unmanagedAsync :: forall a m. MonadIO m => TIOWorker -> ExceptionSink -> IO a -> m (Async a) +unmanagedAsync worker exSink fn = liftIO $ unmanagedAsyncWithUnmask worker exSink ($ fn) +unmanagedAsyncWithUnmask :: forall a m. MonadIO m => TIOWorker -> ExceptionSink -> ((forall b. IO b -> IO b) -> IO a) -> m (Async a) +unmanagedAsyncWithUnmask worker exSink fn = liftIO $ spawnAsync (\_ -> pure ()) worker exSink fn + + +spawnAsync :: forall a m. (MonadIO m, MonadMask m) => (Disposer -> m ()) -> TIOWorker -> ExceptionSink -> ((forall b. IO b -> IO b) -> IO a) -> m (Async a) +spawnAsync registerDisposerFn worker exSink fn = do key <- liftIO newUnique resultVar <- newPromise afixExtra \threadIdFuture -> mask_ do -- Disposer is created first to ensure the resource can be safely attached - disposer <- registerDisposeActionIO (disposeFn key resultVar threadIdFuture) + disposer <- atomically $ newUnmanagedIODisposer (disposeFn key resultVar threadIdFuture) worker exSink + + registerDisposerFn disposer - threadId <- liftIO $ forkWithUnmask (runAndPut exChan key resultVar disposer) exChan + threadId <- liftIO $ forkWithUnmask (runAndPut exSink key resultVar disposer) exSink pure (Async (toFuture resultVar) disposer, threadId) where @@ -94,7 +113,7 @@ asyncWithUnmask' fn = liftQuasarIO do disposeEventuallyIO_ disposer disposeFn :: Unique -> Promise a -> Future ThreadId -> IO () disposeFn key resultVar threadIdFuture = do - -- Should not block or fail (unless the TIOWorker is broken) + -- ThreadId future will be filled by afix threadId <- await threadIdFuture throwTo threadId (CancelAsync key) -- Disposing is considered complete once a result (i.e. success or failure) has been stored -- GitLab