From a1a49d94399d605b1040bad2cbed2ed4d9aadbf6 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 5 Sep 2021 04:11:41 +0200 Subject: [PATCH] Introduce AsyncContext Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Async.hs | 125 +++++++++++++++++++++++++------------------- 1 file changed, 70 insertions(+), 55 deletions(-) diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index 1457f4d..a66da1d 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -2,9 +2,17 @@ module Quasar.Async ( -- * Async/await MonadAsync(..), runUnlimitedAsync, + async, async_, + asyncWithUnmask, asyncWithUnmask_, + -- ** Async context + IsAsyncContext(..), + AsyncContext, + unlimitedAsyncContext, + runUnlimitedAsync, + -- * Unmanaged forking forkTask, forkTask_, @@ -21,75 +29,86 @@ import Quasar.Disposable import Quasar.Prelude -class MonadResourceManager m => MonadAsync m where - async :: m r -> m (Awaitable r) - async action = asyncWithUnmask ($ action) - -- | TODO: Documentation - -- - -- The action will be run with asynchronous exceptions masked and will be passed an action that can be used unmask. - -- - -- TODO change signature to `Awaitable` - asyncWithUnmask :: ((forall a. m a -> m a) -> m r) -> m (Awaitable r) +class IsAsyncContext a where + asyncOnContextWithUnmask :: MonadResourceManager m => a -> (forall f. MonadAsync f => (forall a. f a -> f a) -> f r) -> m (Awaitable r) + asyncOnContextWithUnmask self = asyncOnContextWithUnmask (toAsyncContext self) + toAsyncContext :: a -> AsyncContext + toAsyncContext = AsyncContext -instance MonadAsync m => MonadAsync (ReaderT r m) where - asyncWithUnmask :: ((forall b. ReaderT r m b -> ReaderT r m b) -> ReaderT r m a) -> ReaderT r m (Awaitable a) - asyncWithUnmask action = do - x <- ask - lift $ asyncWithUnmask \unmask -> runReaderT (action (liftUnmask unmask)) x - where - -- | Lift an "unmask" action (e.g. from `mask`) into a `ReaderT`. - liftUnmask :: (m a -> m a) -> (ReaderT r m) a -> (ReaderT r m) a - liftUnmask unmask innerAction = do - value <- ask - lift $ unmask $ runReaderT innerAction value + {-# MINIMAL toAsyncContext | asyncOnContextWithUnmask #-} +data AsyncContext = forall a. IsAsyncContext a => AsyncContext a -async_ :: MonadAsync m => m () -> m () -async_ = void . async +instance IsAsyncContext AsyncContext where + asyncOnContextWithUnmask (AsyncContext ctx) = asyncOnContextWithUnmask ctx -asyncWithUnmask_ :: MonadAsync m => ((forall a. m a -> m a) -> m ()) -> m () -asyncWithUnmask_ action = void $ asyncWithUnmask action +data UnlimitedAsyncContext = UnlimitedAsyncContext +unlimitedAsyncContext :: AsyncContext +unlimitedAsyncContext = toAsyncContext UnlimitedAsyncContext -newtype UnlimitedAsync r = UnlimitedAsync { unUnlimitedAsync :: (ReaderT ResourceManager IO r) } - deriving newtype ( - Functor, - Applicative, - Monad, - MonadIO, - MonadThrow, - MonadCatch, - MonadMask, - MonadFail, - Alternative, - MonadPlus, - MonadAwait, - MonadResourceManager - ) - -instance MonadAsync UnlimitedAsync where - asyncWithUnmask action = do +instance IsAsyncContext UnlimitedAsyncContext where + asyncOnContextWithUnmask UnlimitedAsyncContext action = do resourceManager <- askResourceManager - mask_ $ do - task <- liftIO $ forkTaskWithUnmask (\unmask -> runReaderT (unUnlimitedAsync (action (liftUnmask unmask))) resourceManager) + let asyncContext = unlimitedAsyncContext + mask_ do + task <- forkTaskWithUnmask (\unmask -> runReaderT (runReaderT (action (liftUnmask unmask)) asyncContext) resourceManager) registerDisposable task pure $ toAwaitable task where - liftUnmask :: (forall b. IO b -> IO b) -> UnlimitedAsync a -> UnlimitedAsync a - liftUnmask unmask (UnlimitedAsync innerAction) = UnlimitedAsync do - resourceManager <- ask - liftIO $ unmask $ runReaderT innerAction resourceManager + liftUnmask :: (forall b. IO b -> IO b) -> ReaderT AsyncContext (ReaderT ResourceManager IO) a -> ReaderT AsyncContext (ReaderT ResourceManager IO) a + liftUnmask unmask innerAction = do + resourceManager <- askResourceManager + asyncContext <- askAsyncContext + liftIO $ unmask $ runReaderT (runReaderT innerAction asyncContext) resourceManager + + +class MonadResourceManager m => MonadAsync m where + askAsyncContext :: m AsyncContext + + localAsyncContext :: IsAsyncContext a => a -> m r -> m r + + +instance MonadResourceManager m => MonadAsync (ReaderT AsyncContext m) where + askAsyncContext = ask + localAsyncContext = local . const . toAsyncContext + +instance {-# OVERLAPPABLE #-} MonadAsync m => MonadAsync (ReaderT r m) where + askAsyncContext = lift askAsyncContext + localAsyncContext asyncContext action = do + x <- ask + lift $ localAsyncContext asyncContext $ runReaderT action x + + +-- | TODO: Documentation +-- +-- The action will be run with asynchronous exceptions masked and will be passed an action that can be used unmask. +-- +-- TODO change signature to `Awaitable` +async :: MonadAsync m => (forall f. MonadAsync f => f a) -> m (Awaitable a) +async action = asyncWithUnmask ($ action) + +asyncWithUnmask :: MonadAsync m => (forall f. MonadAsync f => (forall a. f a -> f a) -> f r) -> m (Awaitable r) +asyncWithUnmask action = do + asyncContext <- askAsyncContext + asyncOnContextWithUnmask asyncContext action + +async_ :: MonadAsync m => (forall f. MonadAsync f => f ()) -> m () +async_ action = void $ async action + +asyncWithUnmask_ :: MonadAsync m => (forall f. MonadAsync f => (forall a. f a -> f a) -> f ()) -> m () +asyncWithUnmask_ action = void $ asyncWithUnmask action + -- | Run a computation in `MonadAsync` where `async` is implemented without any thread limits (i.e. every `async` will -- fork a new (RTS) thread). -runUnlimitedAsync :: (MonadResourceManager m) => (forall f. MonadAsync f => f r) -> m r +runUnlimitedAsync :: (MonadResourceManager m) => ReaderT AsyncContext m a -> m a runUnlimitedAsync action = do - resourceManager <- askResourceManager - liftIO $ runReaderT (unUnlimitedAsync action) resourceManager + runReaderT action unlimitedAsyncContext @@ -138,7 +157,3 @@ forkTaskWithUnmask action = do forkTaskWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable forkTaskWithUnmask_ action = toDisposable <$> forkTaskWithUnmask action - - - - -- GitLab