Skip to content
Snippets Groups Projects
Commit 21fbd1e0 authored by Jens Nolte's avatar Jens Nolte
Browse files

Update ResourceManager api draft

parent 7167f4ce
No related branches found
No related tags found
No related merge requests found
...@@ -47,10 +47,10 @@ common shared-properties ...@@ -47,10 +47,10 @@ common shared-properties
default-language: Haskell2010 default-language: Haskell2010
ghc-options: ghc-options:
-Weverything -Weverything
-Wno-all-missed-specialisations
-Wno-missing-safe-haskell-mode -Wno-missing-safe-haskell-mode
-Wno-missing-import-lists -Wno-missing-import-lists
-Wno-unsafe -Wno-unsafe
-Wno-all-missed-specialisations
-Werror=incomplete-patterns -Werror=incomplete-patterns
-Werror=missing-methods -Werror=missing-methods
......
module Quasar.Core ( module Quasar.Core (
-- * ResourceManager
ResourceManager,
ResourceManagerConfiguraiton(..),
HasResourceManager(..),
withResourceManager,
withDefaultResourceManager,
withUnlimitedResourceManager,
newResourceManager,
disposeResourceManager,
-- * AsyncTask
AsyncTask,
cancelTask,
toAsyncTask,
successfulTask,
-- * AsyncIO -- * AsyncIO
AsyncIO, AsyncIO,
async, async,
await, await,
askPool,
runAsyncIO,
awaitResult, awaitResult,
) where ) where
import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, myThreadId) import Control.Concurrent (ThreadId, forkIOWithUnmask, myThreadId)
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Monad.Catch import Control.Monad.Catch
import Control.Monad.Reader import Control.Monad.Reader
...@@ -18,62 +32,85 @@ import Quasar.Awaitable ...@@ -18,62 +32,85 @@ import Quasar.Awaitable
import Quasar.Prelude import Quasar.Prelude
-- * AsyncIO
newtype AsyncT m a = AsyncT (ReaderT Pool m a)
deriving newtype (MonadTrans, Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask)
type AsyncIO = AsyncT IO -- | A monad for actions that run on a thread bound to a `ResourceManager`.
newtype AsyncIO a = AsyncIO (ReaderT ResourceManager IO a)
deriving newtype (Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask)
-- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part.
async :: AsyncIO r -> AsyncIO (Awaitable r) async :: HasResourceManager m => AsyncIO r -> m (AsyncTask r)
async action = asyncWithUnmask (\unmask -> unmask action) async action = asyncWithUnmask (\unmask -> unmask action)
-- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part.
asyncWithUnmask :: ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> AsyncIO (Awaitable r) asyncWithUnmask :: HasResourceManager m => ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> m (AsyncTask r)
-- TODO resource limits -- TODO resource limits
asyncWithUnmask action = mask_ $ do asyncWithUnmask action = do
pool <- askPool resourceManager <- askResourceManager
resultVar <- newAsyncVar resultVar <- newAsyncVar
liftIO $ forkIOWithUnmask $ \unmask -> do liftIO $ mask_ $ do
result <- try $ runOnPool pool (action (liftUnmask unmask)) void $ forkIOWithUnmask $ \unmask -> do
putAsyncVarEither_ resultVar result result <- try $ runOnResourceManager resourceManager (action (liftUnmask unmask))
pure $ toAwaitable resultVar putAsyncVarEither_ resultVar result
pure $ AsyncTask (toAwaitable resultVar)
liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a
liftUnmask unmask action = do liftUnmask unmask action = do
pool <- askPool resourceManager <- askResourceManager
liftIO $ unmask $ runOnPool pool action liftIO $ unmask $ runOnResourceManager resourceManager action
askPool :: AsyncIO Pool
askPool = AsyncT ask
await :: IsAwaitable r a => a -> AsyncIO r await :: IsAwaitable r a => a -> AsyncIO r
-- TODO resource limits -- TODO resource limits
await = liftIO . awaitIO await = liftIO . awaitIO
-- | Run an `AsyncIO` to completion and return the result.
runAsyncIO :: AsyncIO r -> IO r
runAsyncIO = withDefaultPool
class MonadIO m => HasResourceManager m where
askResourceManager :: m ResourceManager
instance HasResourceManager AsyncIO where
askResourceManager = AsyncIO ask
awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult :: IsAwaitable r a => AsyncIO a -> AsyncIO r
awaitResult = (await =<<) awaitResult = (await =<<)
-- TODO rename to ResourceManager -- TODO rename to ResourceManager
data Pool = Pool { data ResourceManager = ResourceManager {
configuration :: PoolConfiguraiton, configuration :: ResourceManagerConfiguraiton,
threads :: TVar (HashSet ThreadId) threads :: TVar (HashSet ThreadId)
} }
-- | A task that is running asynchronously. It has a result and can fail.
-- The result (or exception) can be aquired by using the `Awaitable` class (e.g. by calling `await` or `awaitIO`).
-- It might be possible to cancel the task by using the `Disposable` class if the operation has not been completed.
-- If the result is no longer required the task should be cancelled, to avoid leaking memory.
newtype AsyncTask r = AsyncTask (Awaitable r) newtype AsyncTask r = AsyncTask (Awaitable r)
instance IsAwaitable r (AsyncTask r) where instance IsAwaitable r (AsyncTask r) where
toAwaitable (AsyncTask awaitable) = awaitable toAwaitable (AsyncTask awaitable) = awaitable
instance Functor AsyncTask where
fmap fn (AsyncTask x) = AsyncTask (fn <$> x)
instance Applicative AsyncTask where
pure = AsyncTask . pure
liftA2 fn (AsyncTask fx) (AsyncTask fy) = AsyncTask $ liftA2 fn fx fy
cancelTask :: AsyncTask r -> IO ()
-- TODO resource management
cancelTask = const (pure ())
-- | Creates an `AsyncTask` from an `Awaitable`.
-- The resulting task only depends on an external resource, so disposing it has no effect.
toAsyncTask :: Awaitable r -> AsyncTask r
toAsyncTask = AsyncTask
successfulTask :: r -> AsyncTask r
successfulTask = AsyncTask . successfulAwaitable
data CancelTask = CancelTask data CancelTask = CancelTask
deriving stock Show deriving stock Show
instance Exception CancelTask where instance Exception CancelTask where
...@@ -83,29 +120,40 @@ data CancelledTask = CancelledTask ...@@ -83,29 +120,40 @@ data CancelledTask = CancelledTask
instance Exception CancelledTask where instance Exception CancelledTask where
data PoolConfiguraiton = PoolConfiguraiton data ResourceManagerConfiguraiton = ResourceManagerConfiguraiton {
maxThreads :: Maybe Int
}
defaultResourceManagerConfiguration :: ResourceManagerConfiguraiton
defaultResourceManagerConfiguration = ResourceManagerConfiguraiton {
maxThreads = Just 1
}
defaultPoolConfiguration :: PoolConfiguraiton unlimitedResourceManagerConfiguration :: ResourceManagerConfiguraiton
defaultPoolConfiguration = PoolConfiguraiton unlimitedResourceManagerConfiguration = ResourceManagerConfiguraiton {
maxThreads = Nothing
}
withPool :: PoolConfiguraiton -> AsyncIO r -> IO r withResourceManager :: ResourceManagerConfiguraiton -> AsyncIO r -> IO r
withPool configuration = bracket (newPool configuration) disposePool . flip runOnPool withResourceManager configuration = bracket (newResourceManager configuration) disposeResourceManager . flip runOnResourceManager
runOnPool :: Pool -> AsyncIO r -> IO r runOnResourceManager :: ResourceManager -> AsyncIO r -> IO r
runOnPool pool (AsyncT action) = runReaderT action pool runOnResourceManager resourceManager (AsyncIO action) = runReaderT action resourceManager
withDefaultResourceManager :: AsyncIO a -> IO a
withDefaultResourceManager = withResourceManager defaultResourceManagerConfiguration
withDefaultPool :: AsyncIO a -> IO a withUnlimitedResourceManager :: AsyncIO a -> IO a
withDefaultPool = withPool defaultPoolConfiguration withUnlimitedResourceManager = withResourceManager unlimitedResourceManagerConfiguration
newPool :: PoolConfiguraiton -> IO Pool newResourceManager :: ResourceManagerConfiguraiton -> IO ResourceManager
newPool configuration = do newResourceManager configuration = do
threads <- newTVarIO mempty threads <- newTVarIO mempty
pure Pool { pure ResourceManager {
configuration, configuration,
threads threads
} }
disposePool :: Pool -> IO () disposeResourceManager :: ResourceManager -> IO ()
-- TODO resource management -- TODO resource management
disposePool = const (pure ()) disposeResourceManager = const (pure ())
...@@ -19,7 +19,7 @@ class IsDisposable a where ...@@ -19,7 +19,7 @@ class IsDisposable a where
-- | Dispose a resource in the IO monad. -- | Dispose a resource in the IO monad.
disposeIO :: a -> IO () disposeIO :: a -> IO ()
disposeIO = runAsyncIO . dispose disposeIO = withDefaultResourceManager . dispose
toDisposable :: a -> Disposable toDisposable :: a -> Disposable
toDisposable = mkDisposable . dispose toDisposable = mkDisposable . dispose
......
...@@ -26,46 +26,46 @@ spec = parallel $ do ...@@ -26,46 +26,46 @@ spec = parallel $ do
describe "AsyncIO" $ do describe "AsyncIO" $ do
it "binds pure operations" $ do it "binds pure operations" $ do
runAsyncIO (pure () >>= \() -> pure ()) withDefaultResourceManager (pure () >>= \() -> pure ())
it "binds IO actions" $ do it "binds IO actions" $ do
m1 <- newEmptyMVar m1 <- newEmptyMVar
m2 <- newEmptyMVar m2 <- newEmptyMVar
runAsyncIO (liftIO (putMVar m1 ()) >>= \() -> liftIO (putMVar m2 ())) withDefaultResourceManager (liftIO (putMVar m1 ()) >>= \() -> liftIO (putMVar m2 ()))
tryTakeMVar m1 `shouldReturn` Just () tryTakeMVar m1 `shouldReturn` Just ()
tryTakeMVar m2 `shouldReturn` Just () tryTakeMVar m2 `shouldReturn` Just ()
it "can continue after awaiting an already finished operation" $ do it "can continue after awaiting an already finished operation" $ do
runAsyncIO (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42 withDefaultResourceManager (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42
it "can fmap the result of an already finished async" $ do it "can fmap the result of an already finished async" $ do
avar <- newAsyncVar :: IO (AsyncVar ()) avar <- newAsyncVar :: IO (AsyncVar ())
putAsyncVar_ avar () putAsyncVar_ avar ()
runAsyncIO (id <$> await avar) withDefaultResourceManager (id <$> await avar)
it "can fmap the result of an async that is completed later" $ do it "can fmap the result of an async that is completed later" $ do
avar <- newAsyncVar :: IO (AsyncVar ()) avar <- newAsyncVar :: IO (AsyncVar ())
void $ forkIO $ do void $ forkIO $ do
threadDelay 100000 threadDelay 100000
putAsyncVar_ avar () putAsyncVar_ avar ()
runAsyncIO (id <$> await avar) withDefaultResourceManager (id <$> await avar)
it "can bind the result of an already finished async" $ do it "can bind the result of an already finished async" $ do
avar <- newAsyncVar :: IO (AsyncVar ()) avar <- newAsyncVar :: IO (AsyncVar ())
putAsyncVar_ avar () putAsyncVar_ avar ()
runAsyncIO (await avar >>= pure) withDefaultResourceManager (await avar >>= pure)
it "can bind the result of an async that is completed later" $ do it "can bind the result of an async that is completed later" $ do
avar <- newAsyncVar :: IO (AsyncVar ()) avar <- newAsyncVar :: IO (AsyncVar ())
void $ forkIO $ do void $ forkIO $ do
threadDelay 100000 threadDelay 100000
putAsyncVar_ avar () putAsyncVar_ avar ()
runAsyncIO (await avar >>= pure) withDefaultResourceManager (await avar >>= pure)
it "can terminate when encountering an asynchronous exception" $ do it "can terminate when encountering an asynchronous exception" $ do
never <- newAsyncVar :: IO (AsyncVar ()) never <- newAsyncVar :: IO (AsyncVar ())
result <- timeout 100000 $ runAsyncIO $ result <- timeout 100000 $ withDefaultResourceManager $
-- Use bind to create an AsyncIOPlumbing, which is the interesting case that uses `uninterruptibleMask` when run -- Use bind to create an AsyncIOPlumbing, which is the interesting case that uses `uninterruptibleMask` when run
await never >>= pure await never >>= pure
result `shouldBe` Nothing result `shouldBe` Nothing
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment