diff --git a/quasar.cabal b/quasar.cabal index 6a2e1cae11730e74bc6d28e09e21fd1cef1a5076..ffbd084cb23c3e74d112d490f9c4efa5bb2edbb8 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -47,10 +47,10 @@ common shared-properties default-language: Haskell2010 ghc-options: -Weverything + -Wno-all-missed-specialisations -Wno-missing-safe-haskell-mode -Wno-missing-import-lists -Wno-unsafe - -Wno-all-missed-specialisations -Werror=incomplete-patterns -Werror=missing-methods diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index 42fa5f11154e28f931513c6a9849afed187879aa..7477f443022fd94bfa2117565849bef41bfa30d9 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -1,14 +1,28 @@ module Quasar.Core ( + -- * ResourceManager + ResourceManager, + ResourceManagerConfiguraiton(..), + HasResourceManager(..), + withResourceManager, + withDefaultResourceManager, + withUnlimitedResourceManager, + newResourceManager, + disposeResourceManager, + + -- * AsyncTask + AsyncTask, + cancelTask, + toAsyncTask, + successfulTask, + -- * AsyncIO AsyncIO, async, await, - askPool, - runAsyncIO, awaitResult, ) where -import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, myThreadId) +import Control.Concurrent (ThreadId, forkIOWithUnmask, myThreadId) import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Reader @@ -18,62 +32,85 @@ import Quasar.Awaitable import Quasar.Prelude --- * AsyncIO - - -newtype AsyncT m a = AsyncT (ReaderT Pool m a) - deriving newtype (MonadTrans, Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask) -type AsyncIO = AsyncT IO +-- | A monad for actions that run on a thread bound to a `ResourceManager`. +newtype AsyncIO a = AsyncIO (ReaderT ResourceManager IO a) + deriving newtype (Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask) -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -async :: AsyncIO r -> AsyncIO (Awaitable r) +async :: HasResourceManager m => AsyncIO r -> m (AsyncTask r) async action = asyncWithUnmask (\unmask -> unmask action) -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -asyncWithUnmask :: ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> AsyncIO (Awaitable r) +asyncWithUnmask :: HasResourceManager m => ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> m (AsyncTask r) -- TODO resource limits -asyncWithUnmask action = mask_ $ do - pool <- askPool +asyncWithUnmask action = do + resourceManager <- askResourceManager resultVar <- newAsyncVar - liftIO $ forkIOWithUnmask $ \unmask -> do - result <- try $ runOnPool pool (action (liftUnmask unmask)) - putAsyncVarEither_ resultVar result - pure $ toAwaitable resultVar + liftIO $ mask_ $ do + void $ forkIOWithUnmask $ \unmask -> do + result <- try $ runOnResourceManager resourceManager (action (liftUnmask unmask)) + putAsyncVarEither_ resultVar result + pure $ AsyncTask (toAwaitable resultVar) liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a liftUnmask unmask action = do - pool <- askPool - liftIO $ unmask $ runOnPool pool action - -askPool :: AsyncIO Pool -askPool = AsyncT ask + resourceManager <- askResourceManager + liftIO $ unmask $ runOnResourceManager resourceManager action await :: IsAwaitable r a => a -> AsyncIO r -- TODO resource limits await = liftIO . awaitIO --- | Run an `AsyncIO` to completion and return the result. -runAsyncIO :: AsyncIO r -> IO r -runAsyncIO = withDefaultPool +class MonadIO m => HasResourceManager m where + askResourceManager :: m ResourceManager +instance HasResourceManager AsyncIO where + askResourceManager = AsyncIO ask -awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r +awaitResult :: IsAwaitable r a => AsyncIO a -> AsyncIO r awaitResult = (await =<<) -- TODO rename to ResourceManager -data Pool = Pool { - configuration :: PoolConfiguraiton, +data ResourceManager = ResourceManager { + configuration :: ResourceManagerConfiguraiton, threads :: TVar (HashSet ThreadId) } + +-- | A task that is running asynchronously. It has a result and can fail. +-- The result (or exception) can be aquired by using the `Awaitable` class (e.g. by calling `await` or `awaitIO`). +-- It might be possible to cancel the task by using the `Disposable` class if the operation has not been completed. +-- If the result is no longer required the task should be cancelled, to avoid leaking memory. newtype AsyncTask r = AsyncTask (Awaitable r) + instance IsAwaitable r (AsyncTask r) where toAwaitable (AsyncTask awaitable) = awaitable +instance Functor AsyncTask where + fmap fn (AsyncTask x) = AsyncTask (fn <$> x) + +instance Applicative AsyncTask where + pure = AsyncTask . pure + liftA2 fn (AsyncTask fx) (AsyncTask fy) = AsyncTask $ liftA2 fn fx fy + +cancelTask :: AsyncTask r -> IO () +-- TODO resource management +cancelTask = const (pure ()) + +-- | Creates an `AsyncTask` from an `Awaitable`. +-- The resulting task only depends on an external resource, so disposing it has no effect. +toAsyncTask :: Awaitable r -> AsyncTask r +toAsyncTask = AsyncTask + +successfulTask :: r -> AsyncTask r +successfulTask = AsyncTask . successfulAwaitable + + + data CancelTask = CancelTask deriving stock Show instance Exception CancelTask where @@ -83,29 +120,40 @@ data CancelledTask = CancelledTask instance Exception CancelledTask where -data PoolConfiguraiton = PoolConfiguraiton +data ResourceManagerConfiguraiton = ResourceManagerConfiguraiton { + maxThreads :: Maybe Int +} + +defaultResourceManagerConfiguration :: ResourceManagerConfiguraiton +defaultResourceManagerConfiguration = ResourceManagerConfiguraiton { + maxThreads = Just 1 +} -defaultPoolConfiguration :: PoolConfiguraiton -defaultPoolConfiguration = PoolConfiguraiton +unlimitedResourceManagerConfiguration :: ResourceManagerConfiguraiton +unlimitedResourceManagerConfiguration = ResourceManagerConfiguraiton { + maxThreads = Nothing +} -withPool :: PoolConfiguraiton -> AsyncIO r -> IO r -withPool configuration = bracket (newPool configuration) disposePool . flip runOnPool +withResourceManager :: ResourceManagerConfiguraiton -> AsyncIO r -> IO r +withResourceManager configuration = bracket (newResourceManager configuration) disposeResourceManager . flip runOnResourceManager -runOnPool :: Pool -> AsyncIO r -> IO r -runOnPool pool (AsyncT action) = runReaderT action pool +runOnResourceManager :: ResourceManager -> AsyncIO r -> IO r +runOnResourceManager resourceManager (AsyncIO action) = runReaderT action resourceManager +withDefaultResourceManager :: AsyncIO a -> IO a +withDefaultResourceManager = withResourceManager defaultResourceManagerConfiguration -withDefaultPool :: AsyncIO a -> IO a -withDefaultPool = withPool defaultPoolConfiguration +withUnlimitedResourceManager :: AsyncIO a -> IO a +withUnlimitedResourceManager = withResourceManager unlimitedResourceManagerConfiguration -newPool :: PoolConfiguraiton -> IO Pool -newPool configuration = do +newResourceManager :: ResourceManagerConfiguraiton -> IO ResourceManager +newResourceManager configuration = do threads <- newTVarIO mempty - pure Pool { + pure ResourceManager { configuration, threads } -disposePool :: Pool -> IO () +disposeResourceManager :: ResourceManager -> IO () -- TODO resource management -disposePool = const (pure ()) +disposeResourceManager = const (pure ()) diff --git a/src/Quasar/Disposable.hs b/src/Quasar/Disposable.hs index 33eaaff804e3239dedf2cd0e2d3753d95bf826f0..3e59d75398cca42c0aeb55a2fc6908c29f411add 100644 --- a/src/Quasar/Disposable.hs +++ b/src/Quasar/Disposable.hs @@ -19,7 +19,7 @@ class IsDisposable a where -- | Dispose a resource in the IO monad. disposeIO :: a -> IO () - disposeIO = runAsyncIO . dispose + disposeIO = withDefaultResourceManager . dispose toDisposable :: a -> Disposable toDisposable = mkDisposable . dispose diff --git a/test/Quasar/AsyncSpec.hs b/test/Quasar/AsyncSpec.hs index 5feed6da850a03e7700c4ca2a3225ea9d7615ea1..9c781321906ea9c63059661d9d0106a4959cad7a 100644 --- a/test/Quasar/AsyncSpec.hs +++ b/test/Quasar/AsyncSpec.hs @@ -26,46 +26,46 @@ spec = parallel $ do describe "AsyncIO" $ do it "binds pure operations" $ do - runAsyncIO (pure () >>= \() -> pure ()) + withDefaultResourceManager (pure () >>= \() -> pure ()) it "binds IO actions" $ do m1 <- newEmptyMVar m2 <- newEmptyMVar - runAsyncIO (liftIO (putMVar m1 ()) >>= \() -> liftIO (putMVar m2 ())) + withDefaultResourceManager (liftIO (putMVar m1 ()) >>= \() -> liftIO (putMVar m2 ())) tryTakeMVar m1 `shouldReturn` Just () tryTakeMVar m2 `shouldReturn` Just () it "can continue after awaiting an already finished operation" $ do - runAsyncIO (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42 + withDefaultResourceManager (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42 it "can fmap the result of an already finished async" $ do avar <- newAsyncVar :: IO (AsyncVar ()) putAsyncVar_ avar () - runAsyncIO (id <$> await avar) + withDefaultResourceManager (id <$> await avar) it "can fmap the result of an async that is completed later" $ do avar <- newAsyncVar :: IO (AsyncVar ()) void $ forkIO $ do threadDelay 100000 putAsyncVar_ avar () - runAsyncIO (id <$> await avar) + withDefaultResourceManager (id <$> await avar) it "can bind the result of an already finished async" $ do avar <- newAsyncVar :: IO (AsyncVar ()) putAsyncVar_ avar () - runAsyncIO (await avar >>= pure) + withDefaultResourceManager (await avar >>= pure) it "can bind the result of an async that is completed later" $ do avar <- newAsyncVar :: IO (AsyncVar ()) void $ forkIO $ do threadDelay 100000 putAsyncVar_ avar () - runAsyncIO (await avar >>= pure) + withDefaultResourceManager (await avar >>= pure) it "can terminate when encountering an asynchronous exception" $ do never <- newAsyncVar :: IO (AsyncVar ()) - result <- timeout 100000 $ runAsyncIO $ + result <- timeout 100000 $ withDefaultResourceManager $ -- Use bind to create an AsyncIOPlumbing, which is the interesting case that uses `uninterruptibleMask` when run await never >>= pure result `shouldBe` Nothing