From a717996612900cec461a0ab819ebb0a26cf633f7 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 29 Aug 2021 00:42:22 +0200 Subject: [PATCH] Clean up async/await by introducing MonadAsync/MonadAwait Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Async.hs | 178 +++++++++------------------------- src/Quasar/Awaitable.hs | 22 ++++- src/Quasar/Disposable.hs | 45 ++++++--- src/Quasar/Observable.hs | 4 +- test/Quasar/AsyncSpec.hs | 44 ++------- test/Quasar/AwaitableSpec.hs | 12 +-- test/Quasar/DisposableSpec.hs | 27 +++--- 7 files changed, 129 insertions(+), 203 deletions(-) diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index 68d0aaf..9cfbc46 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -1,10 +1,7 @@ module Quasar.Async ( -- * Async/await MonadAsync(..), - AsyncIO, async, - await, - awaitResult, -- * Task Task, @@ -15,15 +12,9 @@ module Quasar.Async ( successfulTask, failedTask, - -- * AsyncManager - AsyncManager, - AsyncManagerConfiguraiton(..), - withAsyncManager, - withDefaultAsyncManager, - withUnlimitedAsyncManager, - newAsyncManager, - defaultAsyncManagerConfiguration, - unlimitedAsyncManagerConfiguration, + -- ** Task exceptions + CancelTask(..), + TaskDisposed(..), ) where import Control.Concurrent (ThreadId, forkIOWithUnmask, throwTo) @@ -36,93 +27,60 @@ import Quasar.Disposable import Quasar.Prelude --- | A monad for actions that run on a thread bound to a `AsyncManager`. -newtype AsyncIO a = AsyncIO (ReaderT AsyncManager IO a) - deriving newtype (Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask) +class (MonadAwait m, MonadResourceManager m, MonadCatch m) => MonadAsync m where + -- | TODO + async :: m r -> m (Task r) + async action = asyncWithUnmask ($ action) + -- | TODO + -- + -- The action will be run with asynchronous exceptions masked and will be passed an action that can be used unmask. + asyncWithUnmask :: ((forall a. m a -> m a) -> m r) -> m (Task r) --- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -async :: MonadAsync m => AsyncIO r -> m (Task r) -async action = asyncWithUnmask (\unmask -> unmask action) +instance MonadAsync (ReaderT ResourceManager IO) where + asyncWithUnmask action = do + resourceManager <- askResourceManager --- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. --- --- The action will be run with asynchronous exceptions masked and will be passed an action that can be used unmask. -asyncWithUnmask :: MonadAsync m => ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> m (Task r) --- TODO resource limits -asyncWithUnmask action = do - asyncManager <- askAsyncManager + liftIO $ mask_ do + resultVar <- newAsyncVar + threadIdVar <- newEmptyTMVarIO - liftIO $ mask_ do - resultVar <- newAsyncVar - threadIdVar <- newEmptyTMVarIO + disposable <- attachDisposeAction resourceManager (disposeTask threadIdVar resultVar) - disposable <- attachDisposeAction (getResourceManager asyncManager) (disposeTask threadIdVar resultVar) + onException + do + atomically . putTMVar threadIdVar . Just =<< + forkIOWithUnmask \unmask -> do + result <- try $ catch + do runReaderT (action (liftUnmask unmask)) resourceManager + \CancelTask -> throwIO TaskDisposed - onException - do - atomically . putTMVar threadIdVar . Just =<< - forkIOWithUnmask \unmask -> do - result <- try $ catch - do runOnAsyncManager asyncManager (action (liftUnmask unmask)) - \CancelTask -> throwIO TaskDisposed + putAsyncVarEither_ resultVar result - putAsyncVarEither_ resultVar result + -- Thread has completed work, "disarm" the disposable and fire it + void $ atomically $ swapTMVar threadIdVar Nothing + disposeIO disposable - -- Thread has completed work, "disarm" the disposable and fire it - void $ atomically $ swapTMVar threadIdVar Nothing - disposeIO disposable + do atomically $ putTMVar threadIdVar Nothing - do atomically $ putTMVar threadIdVar Nothing + pure $ Task disposable (toAwaitable resultVar) + where + disposeTask :: TMVar (Maybe ThreadId) -> AsyncVar r -> IO (Awaitable ()) + disposeTask threadIdVar resultVar = mask_ do + -- Blocks until the thread is forked + atomically (swapTMVar threadIdVar Nothing) >>= \case + -- Thread completed or initialization failed + Nothing -> pure () + Just threadId -> throwTo threadId CancelTask - pure $ Task disposable (toAwaitable resultVar) - where - disposeTask :: TMVar (Maybe ThreadId) -> AsyncVar r -> IO (Awaitable ()) - disposeTask threadIdVar resultVar = mask_ do - -- Blocks until the thread is forked - atomically (swapTMVar threadIdVar Nothing) >>= \case - -- Thread completed or initialization failed - Nothing -> pure () - Just threadId -> throwTo threadId CancelTask + -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang. + pure $ void (toAwaitable resultVar) `catchAll` const (pure ()) - -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang. - pure $ void (toAwaitable resultVar) `catchAll` const (pure ()) + liftUnmask :: (IO a -> IO a) -> (ReaderT ResourceManager IO) a -> (ReaderT ResourceManager IO) a + liftUnmask unmask action = do + resourceManager <- askResourceManager + liftIO $ unmask $ runReaderT action resourceManager -liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a -liftUnmask unmask action = do - asyncManager <- askAsyncManager - liftIO $ unmask $ runOnAsyncManager asyncManager action - -await :: IsAwaitable r a => a -> AsyncIO r --- TODO resource limits -await = liftIO . awaitIO - - -class MonadIO m => MonadAsync m where - askAsyncManager :: m AsyncManager - -instance MonadAsync AsyncIO where - askAsyncManager = AsyncIO ask - -instance MonadIO m => MonadAsync (ReaderT AsyncManager m) where - askAsyncManager = ask - - -awaitResult :: IsAwaitable r a => AsyncIO a -> AsyncIO r -awaitResult = (await =<<) - --- TODO rename to AsyncContext -data AsyncManager = AsyncManager { - resourceManager :: ResourceManager, - configuration :: AsyncManagerConfiguraiton, - threads :: TVar (HashSet ThreadId) -} - -instance IsDisposable AsyncManager where - toDisposable = toDisposable . getResourceManager - -instance HasResourceManager AsyncManager where - getResourceManager = resourceManager -- | A task that is running asynchronously. It has a result and can fail. @@ -148,7 +106,7 @@ cancelTask :: Task r -> IO (Awaitable ()) cancelTask = dispose cancelTaskIO :: Task r -> IO () -cancelTaskIO = awaitIO <=< dispose +cancelTaskIO = await <=< dispose -- | Creates an `Task` from an `Awaitable`. -- The resulting task only depends on an external resource, so disposing it has no effect. @@ -174,47 +132,3 @@ instance Exception CancelTask where data TaskDisposed = TaskDisposed deriving stock Show instance Exception TaskDisposed where - - -data AsyncManagerConfiguraiton = AsyncManagerConfiguraiton { - maxThreads :: Maybe Int -} - -defaultAsyncManagerConfiguration :: AsyncManagerConfiguraiton -defaultAsyncManagerConfiguration = AsyncManagerConfiguraiton { - maxThreads = Just 1 -} - -unlimitedAsyncManagerConfiguration :: AsyncManagerConfiguraiton -unlimitedAsyncManagerConfiguration = AsyncManagerConfiguraiton { - maxThreads = Nothing -} - -withAsyncManager :: AsyncManagerConfiguraiton -> AsyncIO r -> IO r -withAsyncManager configuration = bracket (unsafeNewAsyncManager configuration) (awaitIO <=< dispose) . flip runOnAsyncManager - -runOnAsyncManager :: AsyncManager -> AsyncIO r -> IO r --- TODO resource limits -runOnAsyncManager asyncManager (AsyncIO action) = runReaderT action asyncManager - -withDefaultAsyncManager :: AsyncIO a -> IO a -withDefaultAsyncManager = withAsyncManager defaultAsyncManagerConfiguration - -withUnlimitedAsyncManager :: AsyncIO a -> IO a -withUnlimitedAsyncManager = withAsyncManager unlimitedAsyncManagerConfiguration - -newAsyncManager :: ResourceManager -> AsyncManagerConfiguraiton -> IO AsyncManager -newAsyncManager parent configuraton = mask_ do - asyncManager <- unsafeNewAsyncManager configuraton - attachDisposable parent asyncManager - pure asyncManager - -unsafeNewAsyncManager :: AsyncManagerConfiguraiton -> IO AsyncManager -unsafeNewAsyncManager configuration = do - resourceManager <- unsafeNewResourceManager - threads <- newTVarIO mempty - pure AsyncManager { - resourceManager, - configuration, - threads - } diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs index 1c380cc..1292986 100644 --- a/src/Quasar/Awaitable.hs +++ b/src/Quasar/Awaitable.hs @@ -2,8 +2,8 @@ module Quasar.Awaitable ( -- * Awaitable IsAwaitable(..), Awaitable, - awaitIO, - tryAwaitIO, + MonadAwait(..), + awaitResult, peekAwaitable, successfulAwaitable, failedAwaitable, @@ -70,12 +70,24 @@ class IsAwaitable r a | a -> r where {-# MINIMAL toAwaitable | (runAwaitable, cacheAwaitable) #-} + +class Monad m => MonadAwait m where + await :: IsAwaitable r a => a -> m r + +instance MonadAwait IO where + await = awaitIO + +instance MonadAwait m => MonadAwait (ReaderT a m) where + await = lift . await + + +awaitResult :: (IsAwaitable r a, MonadAwait m) => m a -> m r +awaitResult = (await =<<) + + awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r awaitIO awaitable = liftIO $ runQueryT atomically (runAwaitable awaitable) -tryAwaitIO :: (IsAwaitable r a, MonadIO m) => a -> m (Either SomeException r) -tryAwaitIO awaitable = liftIO $ try $ awaitIO awaitable - peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r)) peekAwaitable awaitable = liftIO $ runMaybeT $ try $ runQueryT queryFn (runAwaitable awaitable) where diff --git a/src/Quasar/Disposable.hs b/src/Quasar/Disposable.hs index bccd3b3..2c11824 100644 --- a/src/Quasar/Disposable.hs +++ b/src/Quasar/Disposable.hs @@ -11,9 +11,12 @@ module Quasar.Disposable ( -- ** ResourceManager ResourceManager, HasResourceManager(..), + MonadResourceManager(..), withResourceManager, + withOnResourceManager, newResourceManager, unsafeNewResourceManager, + onResourceManager, attachDisposable, attachDisposeAction, attachDisposeAction_, @@ -39,6 +42,7 @@ class IsDisposable a where -- TODO document laws: must not throw exceptions, is idempotent -- | Dispose a resource. + -- TODO MonadIO dispose :: a -> IO (Awaitable ()) dispose = dispose . toDisposable @@ -52,7 +56,7 @@ class IsDisposable a where -- | Dispose a resource in the IO monad. disposeIO :: IsDisposable a => a -> IO () -disposeIO = awaitIO <=< dispose +disposeIO = await <=< dispose instance IsDisposable a => IsDisposable (Maybe a) where toDisposable = maybe noDisposable toDisposable @@ -170,6 +174,7 @@ instance IsAwaitable () ResourceManagerEntry where Nothing -> pure () Just (awaitable, _) -> awaitable + newEntry :: IsDisposable a => a -> IO ResourceManagerEntry newEntry disposable = do disposedAwaitable <- cacheAwaitable (isDisposed disposable) @@ -196,6 +201,24 @@ entryIsEmpty :: ResourceManagerEntry -> STM Bool entryIsEmpty (ResourceManagerEntry var) = isEmptyTMVar var +class HasResourceManager a where + getResourceManager :: a -> ResourceManager + +instance HasResourceManager ResourceManager where + getResourceManager = id + +class MonadIO m => MonadResourceManager m where + askResourceManager :: m ResourceManager + +instance MonadIO m => MonadResourceManager (ReaderT ResourceManager m) where + askResourceManager = ask + + +onResourceManager :: (HasResourceManager a, MonadIO m) => a -> ReaderT ResourceManager m r -> m r +onResourceManager target action = runReaderT action (getResourceManager target) + + + data ResourceManager = ResourceManager { disposingVar :: TVar Bool, disposedVar :: TVar Bool, @@ -203,9 +226,6 @@ data ResourceManager = ResourceManager { entriesVar :: TVar (Seq ResourceManagerEntry) } -class HasResourceManager a where - getResourceManager :: a -> ResourceManager - instance IsDisposable ResourceManager where dispose resourceManager = mask \unmask -> unmask dispose' `catchAll` \ex -> setException resourceManager ex >> throwIO ex @@ -227,17 +247,20 @@ instance IsDisposable ResourceManager where `orElse` ((\disposed -> unless disposed retry) =<< readTVar (disposedVar resourceManager)) -withResourceManager :: (ResourceManager -> IO a) -> IO a -withResourceManager = bracket unsafeNewResourceManager (awaitIO <=< dispose) +withResourceManager :: (MonadAwait m, MonadMask m, MonadIO m) => (ResourceManager -> m a) -> m a +withResourceManager = bracket unsafeNewResourceManager (await <=< liftIO . dispose) + +withOnResourceManager :: (MonadAwait m, MonadMask m, MonadIO m) => (ReaderT ResourceManager m a) -> m a +withOnResourceManager action = withResourceManager \resourceManager -> onResourceManager resourceManager action -newResourceManager :: ResourceManager -> IO ResourceManager -newResourceManager parent = mask_ do +newResourceManager :: MonadIO m => ResourceManager -> m ResourceManager +newResourceManager parent = liftIO $ mask_ do resourceManager <- unsafeNewResourceManager attachDisposable parent resourceManager pure resourceManager -unsafeNewResourceManager :: IO ResourceManager -unsafeNewResourceManager = do +unsafeNewResourceManager :: MonadIO m => m ResourceManager +unsafeNewResourceManager = liftIO do disposingVar <- newTVarIO False disposedVar <- newTVarIO False exceptionVar <- newEmptyTMVarIO @@ -274,7 +297,7 @@ collectGarbage resourceManager = go -- Wait for any entry to complete or until a new entry is added let awaitables = (toAwaitable <$> toList snapshot) -- GC fails here when an waitable throws an exception - void $ awaitIO if Quasar.Prelude.null awaitables + void $ await if Quasar.Prelude.null awaitables then awaitAny2 listChanged isDisposing else awaitAny (listChanged :| awaitables) diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index d7776f8..4a7d525 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -68,7 +68,7 @@ class IsRetrievable v a | a -> v where retrieve :: MonadAsync m => a -> m (Task v) retrieveIO :: IsRetrievable v a => a -> IO v -retrieveIO x = awaitIO =<< withDefaultAsyncManager (retrieve x) +retrieveIO x = withOnResourceManager $ await =<< retrieve x class IsRetrievable v o => IsObservable v o | o -> v where observe :: o -> (ObservableMessage v -> IO ()) -> IO Disposable @@ -79,7 +79,7 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f --- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `AsyncManager` is disposed. +-- | (TODO) Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. observeWhile :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable observeWhile observable callback = do --disposeVar <- liftIO $ newTVarIO False diff --git a/test/Quasar/AsyncSpec.hs b/test/Quasar/AsyncSpec.hs index 0ece2a9..8034d9f 100644 --- a/test/Quasar/AsyncSpec.hs +++ b/test/Quasar/AsyncSpec.hs @@ -7,58 +7,32 @@ import Prelude import Test.Hspec import Quasar.Async import Quasar.Awaitable +import Quasar.Disposable import System.Timeout spec :: Spec spec = parallel $ do - describe "AsyncIO" $ do - it "binds pure operations" $ do - withDefaultAsyncManager (pure () >>= \() -> pure ()) - - it "binds IO actions" $ do - m1 <- newEmptyMVar - m2 <- newEmptyMVar - withDefaultAsyncManager (liftIO (putMVar m1 ()) >>= \() -> liftIO (putMVar m2 ())) - tryTakeMVar m1 `shouldReturn` Just () - tryTakeMVar m2 `shouldReturn` Just () + describe "async" $ do + it "can pass a value through async and await" $ do + withOnResourceManager (await =<< async (pure 42)) `shouldReturn` (42 :: Int) it "can pass a value through async and await" $ do - withDefaultAsyncManager (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42 + withOnResourceManager (await =<< async (liftIO (threadDelay 100000) >> pure 42)) `shouldReturn` (42 :: Int) + describe "await" $ do it "can await the result of an async that is completed later" $ do avar <- newAsyncVar :: IO (AsyncVar ()) void $ forkIO $ do threadDelay 100000 putAsyncVar_ avar () - withDefaultAsyncManager (await avar) + await avar it "can fmap the result of an already finished async" $ do - avar <- newAsyncVar :: IO (AsyncVar ()) - putAsyncVar_ avar () - withDefaultAsyncManager (id <$> await avar) - - it "can fmap the result of an async that is completed later" $ do - avar <- newAsyncVar :: IO (AsyncVar ()) - void $ forkIO $ do - threadDelay 100000 - putAsyncVar_ avar () - withDefaultAsyncManager (id <$> await avar) - - it "can bind the result of an already finished async" $ do - avar <- newAsyncVar :: IO (AsyncVar ()) - putAsyncVar_ avar () - withDefaultAsyncManager (await avar >>= pure) - - it "can bind the result of an async that is completed later" $ do - avar <- newAsyncVar :: IO (AsyncVar ()) - void $ forkIO $ do - threadDelay 100000 - putAsyncVar_ avar () - withDefaultAsyncManager (await avar >>= pure) + await (pure () :: Awaitable ()) :: IO () it "can terminate when encountering an asynchronous exception" $ do never <- newAsyncVar :: IO (AsyncVar ()) - result <- timeout 100000 $ withDefaultAsyncManager $ + result <- timeout 100000 $ withOnResourceManager $ await never result `shouldBe` Nothing diff --git a/test/Quasar/AwaitableSpec.hs b/test/Quasar/AwaitableSpec.hs index eb5d3c1..1e4666b 100644 --- a/test/Quasar/AwaitableSpec.hs +++ b/test/Quasar/AwaitableSpec.hs @@ -10,7 +10,7 @@ spec :: Spec spec = parallel $ do describe "Awaitable" $ do it "can await pure values" $ do - awaitIO $ (pure () :: Awaitable ()) :: IO () + await $ (pure () :: Awaitable ()) :: IO () describe "AsyncVar" $ do it "can be created" $ do @@ -25,7 +25,7 @@ spec = parallel $ do avar <- newAsyncVar :: IO (AsyncVar ()) putAsyncVar_ avar () - awaitIO avar + await avar it "can be awaited when completed asynchronously" $ do avar <- newAsyncVar :: IO (AsyncVar ()) @@ -33,12 +33,12 @@ spec = parallel $ do threadDelay 100000 putAsyncVar_ avar () - awaitIO avar + await avar describe "awaitAny" $ do it "works with completed awaitables" $ do - awaitIO (awaitAny2 (pure () :: Awaitable ()) (pure () :: Awaitable ())) :: IO () + await (awaitAny2 (pure () :: Awaitable ()) (pure () :: Awaitable ())) :: IO () it "can be completed later" $ do avar1 <- newAsyncVar :: IO (AsyncVar ()) @@ -46,7 +46,7 @@ spec = parallel $ do void $ forkIO $ do threadDelay 100000 putAsyncVar_ avar1 () - awaitIO (awaitAny2 avar1 avar2) + await (awaitAny2 avar1 avar2) it "can be completed later by the second parameter" $ do avar1 <- newAsyncVar :: IO (AsyncVar ()) @@ -54,4 +54,4 @@ spec = parallel $ do void $ forkIO $ do threadDelay 100000 putAsyncVar_ avar2 () - awaitIO (awaitAny2 avar1 avar2) + await (awaitAny2 avar1 avar2) diff --git a/test/Quasar/DisposableSpec.hs b/test/Quasar/DisposableSpec.hs index b7c4bde..8d57ac5 100644 --- a/test/Quasar/DisposableSpec.hs +++ b/test/Quasar/DisposableSpec.hs @@ -9,56 +9,59 @@ import Quasar.Awaitable import Quasar.Disposable data TestException = TestException - deriving (Eq, Show) + deriving stock (Eq, Show) instance Exception TestException +io :: IO a -> IO a +io = id + spec :: Spec spec = parallel $ do describe "Disposable" $ do describe "noDisposable" $ do it "can be disposed" $ do - awaitIO =<< dispose noDisposable + await =<< dispose noDisposable it "can be awaited" $ do - awaitIO (isDisposed noDisposable) + await (isDisposed noDisposable) pure () :: IO () describe "newDisposable" $ do it "signals it's disposed state" $ do disposable <- newDisposable $ pure $ pure () void $ forkIO $ threadDelay 100000 >> disposeIO disposable - awaitIO (isDisposed disposable) + await (isDisposed disposable) pure () :: IO () it "can be disposed multiple times" $ do disposable <- newDisposable $ pure $ pure () disposeIO disposable disposeIO disposable - awaitIO (isDisposed disposable) + await (isDisposed disposable) it "can be disposed in parallel" $ do disposable <- newDisposable $ pure () <$ threadDelay 100000 void $ forkIO $ disposeIO disposable disposeIO disposable - awaitIO (isDisposed disposable) + await (isDisposed disposable) describe "ResourceManager" $ do - it "can be created" $ do + it "can be created" $ io do void unsafeNewResourceManager it "can be created and disposed" $ do resourceManager <- unsafeNewResourceManager - awaitIO =<< dispose resourceManager + await =<< dispose resourceManager - it "can be created and disposed" $ do + it "can be created and disposed" $ io do withResourceManager \_ -> pure () it "can be created and disposed with a delay" $ do withResourceManager \_ -> threadDelay 100000 - it "can \"dispose\" a noDisposable" $ do + it "can \"dispose\" a noDisposable" $ io do withResourceManager \resourceManager -> do attachDisposable resourceManager noDisposable @@ -80,13 +83,13 @@ spec = parallel $ do it "can call a trivial dispose action" $ do withResourceManager \resourceManager -> - attachDisposeAction resourceManager $ pure $ pure () + attachDisposeAction_ resourceManager $ pure $ pure () pure () :: IO () it "can call a dispose action" $ do withResourceManager \resourceManager -> do avar <- newAsyncVar :: IO (AsyncVar ()) - attachDisposeAction resourceManager $ toAwaitable avar <$ putAsyncVar_ avar () + attachDisposeAction_ resourceManager $ toAwaitable avar <$ putAsyncVar_ avar () pure () :: IO () it "re-throws an exception from a dispose action" $ do -- GitLab