diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index ebffedf6b8d701378735a0eef9524cb4ef9472be..3a51b99182e08d598dfab7ce3c6abf3ffab11a9f 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -1,5 +1,5 @@ module Quasar.Async ( - Async, + Async(..), async, async_, asyncWithUnmask, @@ -15,11 +15,16 @@ module Quasar.Async ( -- ** IO variant async', asyncWithUnmask', + + -- ** Unmanaged variants + unmanagedAsync, + unmanagedAsyncWithUnmask, ) where import Control.Concurrent (ThreadId) import Control.Monad.Catch import Quasar.Async.Fork +import Quasar.Async.STMHelper import Quasar.Future import Quasar.Exceptions import Quasar.MonadQuasar @@ -63,17 +68,31 @@ async' fn = asyncWithUnmask' ($ fn) asyncWithUnmask' :: forall a m. (MonadQuasar m, MonadIO m) => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) asyncWithUnmask' fn = liftQuasarIO do - exChan <- askExceptionSink + worker <- askIOWorker + exSink <- askExceptionSink + spawnAsync registerResourceIO worker exSink fn + + +unmanagedAsync :: forall a m. MonadIO m => TIOWorker -> ExceptionSink -> IO a -> m (Async a) +unmanagedAsync worker exSink fn = liftIO $ unmanagedAsyncWithUnmask worker exSink ($ fn) +unmanagedAsyncWithUnmask :: forall a m. MonadIO m => TIOWorker -> ExceptionSink -> ((forall b. IO b -> IO b) -> IO a) -> m (Async a) +unmanagedAsyncWithUnmask worker exSink fn = liftIO $ spawnAsync (\_ -> pure ()) worker exSink fn + + +spawnAsync :: forall a m. (MonadIO m, MonadMask m) => (Disposer -> m ()) -> TIOWorker -> ExceptionSink -> ((forall b. IO b -> IO b) -> IO a) -> m (Async a) +spawnAsync registerDisposerFn worker exSink fn = do key <- liftIO newUnique resultVar <- newPromise afixExtra \threadIdFuture -> mask_ do -- Disposer is created first to ensure the resource can be safely attached - disposer <- registerDisposeActionIO (disposeFn key resultVar threadIdFuture) + disposer <- atomically $ newUnmanagedIODisposer (disposeFn key resultVar threadIdFuture) worker exSink + + registerDisposerFn disposer - threadId <- liftIO $ forkWithUnmask (runAndPut exChan key resultVar disposer) exChan + threadId <- liftIO $ forkWithUnmask (runAndPut exSink key resultVar disposer) exSink pure (Async (toFuture resultVar) disposer, threadId) where @@ -94,7 +113,7 @@ asyncWithUnmask' fn = liftQuasarIO do disposeEventuallyIO_ disposer disposeFn :: Unique -> Promise a -> Future ThreadId -> IO () disposeFn key resultVar threadIdFuture = do - -- Should not block or fail (unless the TIOWorker is broken) + -- ThreadId future will be filled by afix threadId <- await threadIdFuture throwTo threadId (CancelAsync key) -- Disposing is considered complete once a result (i.e. success or failure) has been stored