From def13f747ae035a08aa62bc5ce175dcbbdcb7254 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 22 Aug 2021 23:50:44 +0200 Subject: [PATCH] Move async/await-related code to Quasar.Async module --- quasar.cabal | 3 +- src/Quasar/Async.hs | 184 ++++++++++++++++++ src/Quasar/Core.hs | 180 +---------------- src/Quasar/Observable.hs | 1 + .../Quasar/{AsyncSpec.hs => AwaitableSpec.hs} | 4 +- 5 files changed, 191 insertions(+), 181 deletions(-) create mode 100644 src/Quasar/Async.hs rename test/Quasar/{AsyncSpec.hs => AwaitableSpec.hs} (97%) diff --git a/quasar.cabal b/quasar.cabal index 50fd701..20b4009 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -78,6 +78,7 @@ library transformers, unordered-containers, exposed-modules: + Quasar.Async Quasar.Awaitable Quasar.Core Quasar.Disposable @@ -101,7 +102,7 @@ test-suite quasar-test unordered-containers, main-is: Spec.hs other-modules: - Quasar.AsyncSpec + Quasar.AwaitableSpec Quasar.ObservableSpec Quasar.Observable.ObservableHashMapSpec Quasar.Observable.ObservablePrioritySpec diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs new file mode 100644 index 0000000..0f9bfa9 --- /dev/null +++ b/src/Quasar/Async.hs @@ -0,0 +1,184 @@ +module Quasar.Async ( + -- * Async/await + MonadAsync(..), + AsyncIO, + async, + await, + awaitResult, + + -- * Task + Task, + cancelTask, + cancelTaskIO, + toTask, + completedTask, + successfulTask, + failedTask, + + -- * AsyncManager + AsyncManager, + AsyncManagerConfiguraiton(..), + withAsyncManager, + withDefaultAsyncManager, + withUnlimitedAsyncManager, + newAsyncManager, + defaultAsyncManagerConfiguration, + unlimitedAsyncManagerConfiguration, +) where + +import Control.Concurrent (ThreadId, forkIOWithUnmask) +import Control.Concurrent.STM +import Control.Monad.Catch +import Control.Monad.Reader +import Data.HashSet +import Quasar.Awaitable +import Quasar.Core +import Quasar.Prelude + + +-- | A monad for actions that run on a thread bound to a `AsyncManager`. +newtype AsyncIO a = AsyncIO (ReaderT AsyncManager IO a) + deriving newtype (Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask) + + +-- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. +async :: MonadAsync m => AsyncIO r -> m (Task r) +async action = asyncWithUnmask (\unmask -> unmask action) + +-- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. +asyncWithUnmask :: MonadAsync m => ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> m (Task r) +-- TODO resource limits +asyncWithUnmask action = do + asyncManager <- askAsyncManager + resultVar <- newAsyncVar + liftIO $ mask_ $ do + void $ forkIOWithUnmask $ \unmask -> do + result <- try $ runOnAsyncManager asyncManager (action (liftUnmask unmask)) + putAsyncVarEither_ resultVar result + pure $ Task (toAwaitable resultVar) + +liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a +liftUnmask unmask action = do + asyncManager <- askAsyncManager + liftIO $ unmask $ runOnAsyncManager asyncManager action + +await :: IsAwaitable r a => a -> AsyncIO r +-- TODO resource limits +await = liftIO . awaitIO + + +class MonadIO m => MonadAsync m where + askAsyncManager :: m AsyncManager + +instance MonadAsync AsyncIO where + askAsyncManager = AsyncIO ask + +instance MonadIO m => MonadAsync (ReaderT AsyncManager m) where + askAsyncManager = ask + + +awaitResult :: IsAwaitable r a => AsyncIO a -> AsyncIO r +awaitResult = (await =<<) + +data AsyncManager = AsyncManager { + resourceManager :: ResourceManager, + configuration :: AsyncManagerConfiguraiton, + threads :: TVar (HashSet ThreadId) +} + +instance IsDisposable AsyncManager where + toDisposable = undefined + +instance HasResourceManager AsyncManager where + getResourceManager = resourceManager + + +-- | A task that is running asynchronously. It has a result and can fail. +-- The result (or exception) can be aquired by using the `IsAwaitable` class (e.g. by calling `await` or `awaitIO`). +-- It might be possible to cancel the task by using the `IsDisposable` class if the operation has not been completed. +-- If the result is no longer required the task should be cancelled, to avoid leaking memory. +newtype Task r = Task (Awaitable r) + +instance IsAwaitable r (Task r) where + toAwaitable (Task awaitable) = awaitable + +instance IsDisposable (Task r) where + toDisposable = undefined + +instance Functor Task where + fmap fn (Task x) = Task (fn <$> x) + +instance Applicative Task where + pure = Task . pure + liftA2 fn (Task fx) (Task fy) = Task $ liftA2 fn fx fy + +cancelTask :: Task r -> IO (Awaitable ()) +cancelTask = dispose + +cancelTaskIO :: Task r -> IO () +cancelTaskIO = awaitIO <=< dispose + +-- | Creates an `Task` from an `Awaitable`. +-- The resulting task only depends on an external resource, so disposing it has no effect. +toTask :: IsAwaitable r a => a -> Task r +toTask = Task . toAwaitable + +completedTask :: Either SomeException r -> Task r +completedTask = toTask . completedAwaitable + +-- | Alias for `pure` +successfulTask :: r -> Task r +successfulTask = pure + +failedTask :: SomeException -> Task r +failedTask = toTask . failedAwaitable + + + +data CancelTask = CancelTask + deriving stock Show +instance Exception CancelTask where + +data CancelledTask = CancelledTask + deriving stock Show +instance Exception CancelledTask where + + +data AsyncManagerConfiguraiton = AsyncManagerConfiguraiton { + maxThreads :: Maybe Int +} + +defaultAsyncManagerConfiguration :: AsyncManagerConfiguraiton +defaultAsyncManagerConfiguration = AsyncManagerConfiguraiton { + maxThreads = Just 1 +} + +unlimitedAsyncManagerConfiguration :: AsyncManagerConfiguraiton +unlimitedAsyncManagerConfiguration = AsyncManagerConfiguraiton { + maxThreads = Nothing +} + +withAsyncManager :: AsyncManagerConfiguraiton -> AsyncIO r -> IO r +withAsyncManager configuration = bracket (newAsyncManager configuration) disposeAsyncManager . flip runOnAsyncManager + +runOnAsyncManager :: AsyncManager -> AsyncIO r -> IO r +runOnAsyncManager asyncManager (AsyncIO action) = runReaderT action asyncManager + +withDefaultAsyncManager :: AsyncIO a -> IO a +withDefaultAsyncManager = withAsyncManager defaultAsyncManagerConfiguration + +withUnlimitedAsyncManager :: AsyncIO a -> IO a +withUnlimitedAsyncManager = withAsyncManager unlimitedAsyncManagerConfiguration + +newAsyncManager :: AsyncManagerConfiguraiton -> IO AsyncManager +newAsyncManager configuration = do + threads <- newTVarIO mempty + pure AsyncManager { + configuration, + threads + } + +disposeAsyncManager :: AsyncManager -> IO () +-- TODO resource management +disposeAsyncManager = const (pure ()) + diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index 64f684e..4faae87 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -1,30 +1,4 @@ module Quasar.Core ( - -- * AsyncManager - AsyncManager, - AsyncManagerConfiguraiton(..), - MonadAsync(..), - withAsyncManager, - withDefaultAsyncManager, - withUnlimitedAsyncManager, - newAsyncManager, - defaultAsyncManagerConfiguration, - unlimitedAsyncManagerConfiguration, - - -- * Task - Task, - cancelTask, - cancelTaskIO, - toTask, - completedTask, - successfulTask, - failedTask, - - -- * AsyncIO - AsyncIO, - async, - await, - awaitResult, - -- * Disposable IsDisposable(..), Disposable, @@ -35,6 +9,7 @@ module Quasar.Core ( -- ** ResourceManager ResourceManager, + HasResourceManager(..), newResourceManager, disposeEventually, attachDisposable, @@ -42,164 +17,13 @@ module Quasar.Core ( attachDisposeAction_, ) where -import Control.Concurrent (ThreadId, forkIOWithUnmask) import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Reader -import Data.HashSet import Quasar.Awaitable import Quasar.Prelude - --- | A monad for actions that run on a thread bound to a `AsyncManager`. -newtype AsyncIO a = AsyncIO (ReaderT AsyncManager IO a) - deriving newtype (Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask) - - --- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -async :: MonadAsync m => AsyncIO r -> m (Task r) -async action = asyncWithUnmask (\unmask -> unmask action) - --- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -asyncWithUnmask :: MonadAsync m => ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> m (Task r) --- TODO resource limits -asyncWithUnmask action = do - asyncManager <- askAsyncManager - resultVar <- newAsyncVar - liftIO $ mask_ $ do - void $ forkIOWithUnmask $ \unmask -> do - result <- try $ runOnAsyncManager asyncManager (action (liftUnmask unmask)) - putAsyncVarEither_ resultVar result - pure $ Task (toAwaitable resultVar) - -liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a -liftUnmask unmask action = do - asyncManager <- askAsyncManager - liftIO $ unmask $ runOnAsyncManager asyncManager action - -await :: IsAwaitable r a => a -> AsyncIO r --- TODO resource limits -await = liftIO . awaitIO - - -class MonadIO m => MonadAsync m where - askAsyncManager :: m AsyncManager - -instance MonadAsync AsyncIO where - askAsyncManager = AsyncIO ask - -instance MonadIO m => MonadAsync (ReaderT AsyncManager m) where - askAsyncManager = ask - - -awaitResult :: IsAwaitable r a => AsyncIO a -> AsyncIO r -awaitResult = (await =<<) - -data AsyncManager = AsyncManager { - resourceManager :: ResourceManager, - configuration :: AsyncManagerConfiguraiton, - threads :: TVar (HashSet ThreadId) -} - -instance IsDisposable AsyncManager where - toDisposable = undefined - -instance HasResourceManager AsyncManager where - getResourceManager = resourceManager - - --- | A task that is running asynchronously. It has a result and can fail. --- The result (or exception) can be aquired by using the `IsAwaitable` class (e.g. by calling `await` or `awaitIO`). --- It might be possible to cancel the task by using the `IsDisposable` class if the operation has not been completed. --- If the result is no longer required the task should be cancelled, to avoid leaking memory. -newtype Task r = Task (Awaitable r) - -instance IsAwaitable r (Task r) where - toAwaitable (Task awaitable) = awaitable - -instance IsDisposable (Task r) where - toDisposable = undefined - -instance Functor Task where - fmap fn (Task x) = Task (fn <$> x) - -instance Applicative Task where - pure = Task . pure - liftA2 fn (Task fx) (Task fy) = Task $ liftA2 fn fx fy - -cancelTask :: Task r -> IO (Awaitable ()) -cancelTask = dispose - -cancelTaskIO :: Task r -> IO () -cancelTaskIO = awaitIO <=< dispose - --- | Creates an `Task` from an `Awaitable`. --- The resulting task only depends on an external resource, so disposing it has no effect. -toTask :: IsAwaitable r a => a -> Task r -toTask = Task . toAwaitable - -completedTask :: Either SomeException r -> Task r -completedTask = toTask . completedAwaitable - --- | Alias for `pure` -successfulTask :: r -> Task r -successfulTask = pure - -failedTask :: SomeException -> Task r -failedTask = toTask . failedAwaitable - - - -data CancelTask = CancelTask - deriving stock Show -instance Exception CancelTask where - -data CancelledTask = CancelledTask - deriving stock Show -instance Exception CancelledTask where - - -data AsyncManagerConfiguraiton = AsyncManagerConfiguraiton { - maxThreads :: Maybe Int -} - -defaultAsyncManagerConfiguration :: AsyncManagerConfiguraiton -defaultAsyncManagerConfiguration = AsyncManagerConfiguraiton { - maxThreads = Just 1 -} - -unlimitedAsyncManagerConfiguration :: AsyncManagerConfiguraiton -unlimitedAsyncManagerConfiguration = AsyncManagerConfiguraiton { - maxThreads = Nothing -} - -withAsyncManager :: AsyncManagerConfiguraiton -> AsyncIO r -> IO r -withAsyncManager configuration = bracket (newAsyncManager configuration) disposeAsyncManager . flip runOnAsyncManager - -runOnAsyncManager :: AsyncManager -> AsyncIO r -> IO r -runOnAsyncManager asyncManager (AsyncIO action) = runReaderT action asyncManager - -withDefaultAsyncManager :: AsyncIO a -> IO a -withDefaultAsyncManager = withAsyncManager defaultAsyncManagerConfiguration - -withUnlimitedAsyncManager :: AsyncIO a -> IO a -withUnlimitedAsyncManager = withAsyncManager unlimitedAsyncManagerConfiguration - -newAsyncManager :: AsyncManagerConfiguraiton -> IO AsyncManager -newAsyncManager configuration = do - threads <- newTVarIO mempty - pure AsyncManager { - configuration, - threads - } - -disposeAsyncManager :: AsyncManager -> IO () --- TODO resource management -disposeAsyncManager = const (pure ()) - - - -- * Disposable class IsDisposable a where @@ -324,7 +148,7 @@ disposeEventually _resourceManager disposable = liftIO $ do Nothing -> undefined -- TODO register on resourceManager attachDisposable :: (IsDisposable a, MonadIO m) => ResourceManager -> a -> m () -attachDisposable _resourceManager disposable = undefined +attachDisposable _resourceManager disposable = liftIO undefined -- | Creates an `Disposable` that is bound to a ResourceManager. It will automatically be disposed when the resource manager is disposed. attachDisposeAction :: MonadIO m => ResourceManager -> IO (Awaitable ()) -> m Disposable diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 46130c4..1da3736 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -38,6 +38,7 @@ import Control.Monad.Except import Control.Monad.Trans.Maybe import Data.HashMap.Strict qualified as HM import Data.Unique +import Quasar.Async import Quasar.Awaitable import Quasar.Core import Quasar.Prelude diff --git a/test/Quasar/AsyncSpec.hs b/test/Quasar/AwaitableSpec.hs similarity index 97% rename from test/Quasar/AsyncSpec.hs rename to test/Quasar/AwaitableSpec.hs index 0ad11cc..77c9995 100644 --- a/test/Quasar/AsyncSpec.hs +++ b/test/Quasar/AwaitableSpec.hs @@ -1,12 +1,12 @@ -module Quasar.AsyncSpec (spec) where +module Quasar.AwaitableSpec (spec) where import Control.Concurrent import Control.Monad (void) import Control.Monad.IO.Class import Prelude import Test.Hspec +import Quasar.Async import Quasar.Awaitable -import Quasar.Core import System.Timeout spec :: Spec -- GitLab