Skip to content
Snippets Groups Projects
Commit a0df9d46 authored by Jens Nolte's avatar Jens Nolte
Browse files

Add unmanaged async variants


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 778bfbec
No related branches found
No related tags found
No related merge requests found
Pipeline #2779 passed
module Quasar.Async (
Async,
Async(..),
async,
async_,
asyncWithUnmask,
......@@ -15,11 +15,16 @@ module Quasar.Async (
-- ** IO variant
async',
asyncWithUnmask',
-- ** Unmanaged variants
unmanagedAsync,
unmanagedAsyncWithUnmask,
) where
import Control.Concurrent (ThreadId)
import Control.Monad.Catch
import Quasar.Async.Fork
import Quasar.Async.STMHelper
import Quasar.Future
import Quasar.Exceptions
import Quasar.MonadQuasar
......@@ -63,17 +68,31 @@ async' fn = asyncWithUnmask' ($ fn)
asyncWithUnmask' :: forall a m. (MonadQuasar m, MonadIO m) => ((forall b. IO b -> IO b) -> IO a) -> m (Async a)
asyncWithUnmask' fn = liftQuasarIO do
exChan <- askExceptionSink
worker <- askIOWorker
exSink <- askExceptionSink
spawnAsync registerResourceIO worker exSink fn
unmanagedAsync :: forall a m. MonadIO m => TIOWorker -> ExceptionSink -> IO a -> m (Async a)
unmanagedAsync worker exSink fn = liftIO $ unmanagedAsyncWithUnmask worker exSink ($ fn)
unmanagedAsyncWithUnmask :: forall a m. MonadIO m => TIOWorker -> ExceptionSink -> ((forall b. IO b -> IO b) -> IO a) -> m (Async a)
unmanagedAsyncWithUnmask worker exSink fn = liftIO $ spawnAsync (\_ -> pure ()) worker exSink fn
spawnAsync :: forall a m. (MonadIO m, MonadMask m) => (Disposer -> m ()) -> TIOWorker -> ExceptionSink -> ((forall b. IO b -> IO b) -> IO a) -> m (Async a)
spawnAsync registerDisposerFn worker exSink fn = do
key <- liftIO newUnique
resultVar <- newPromise
afixExtra \threadIdFuture -> mask_ do
-- Disposer is created first to ensure the resource can be safely attached
disposer <- registerDisposeActionIO (disposeFn key resultVar threadIdFuture)
disposer <- atomically $ newUnmanagedIODisposer (disposeFn key resultVar threadIdFuture) worker exSink
registerDisposerFn disposer
threadId <- liftIO $ forkWithUnmask (runAndPut exChan key resultVar disposer) exChan
threadId <- liftIO $ forkWithUnmask (runAndPut exSink key resultVar disposer) exSink
pure (Async (toFuture resultVar) disposer, threadId)
where
......@@ -94,7 +113,7 @@ asyncWithUnmask' fn = liftQuasarIO do
disposeEventuallyIO_ disposer
disposeFn :: Unique -> Promise a -> Future ThreadId -> IO ()
disposeFn key resultVar threadIdFuture = do
-- Should not block or fail (unless the TIOWorker is broken)
-- ThreadId future will be filled by afix
threadId <- await threadIdFuture
throwTo threadId (CancelAsync key)
-- Disposing is considered complete once a result (i.e. success or failure) has been stored
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment