From 1549b0f828417b8a5b0c4c57d4815f7c2df8d20c Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sat, 31 Jul 2021 00:42:36 +0200 Subject: [PATCH] Change await to perform smaller STM transactions Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Awaitable.hs | 71 ++++++++++++++++++++++++++++------------- src/Quasar/Core.hs | 41 ++++++------------------ 2 files changed, 57 insertions(+), 55 deletions(-) diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs index de4a5ab..63d72e6 100644 --- a/src/Quasar/Awaitable.hs +++ b/src/Quasar/Awaitable.hs @@ -2,12 +2,10 @@ module Quasar.Awaitable ( -- * Awaitable IsAwaitable(..), awaitIO, - awaitSTM, Awaitable, successfulAwaitable, failedAwaitable, completedAwaitable, - awaitableFromSTM, peekAwaitable, -- * AsyncVar @@ -26,11 +24,12 @@ module Quasar.Awaitable ( import Control.Concurrent.STM import Control.Monad.Catch +import Data.Bifunctor (bimap) import Quasar.Prelude class IsAwaitable r a | a -> r where - peekSTM :: a -> STM (Maybe (Either SomeException r)) + peekSTM :: a -> STM (Maybe (Either (Awaitable r) (Either SomeException r))) peekSTM = peekSTM . toAwaitable toAwaitable :: a -> Awaitable r @@ -39,26 +38,27 @@ class IsAwaitable r a | a -> r where {-# MINIMAL toAwaitable | peekSTM #-} --- | Wait until the promise is settled and return the result. -awaitSTM :: IsAwaitable r a => a -> STM (Either SomeException r) -awaitSTM = peekSTM >=> maybe retry pure - awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r -awaitIO action = liftIO $ either throwIO pure =<< atomically (awaitSTM action) +awaitIO input = liftIO $ either throwIO pure =<< go (toAwaitable input) + where + go :: Awaitable r -> IO (Either SomeException r) + go x = do + stepResult <- atomically $ maybe retry pure =<< peekSTM x + either go pure stepResult -newtype Awaitable r = Awaitable (STM (Maybe (Either SomeException r))) +newtype Awaitable r = Awaitable (STM (Maybe (Either (Awaitable r) (Either SomeException r)))) instance IsAwaitable r (Awaitable r) where peekSTM (Awaitable x) = x toAwaitable = id instance Functor Awaitable where - fmap fn = Awaitable . fmap (fmap (fmap fn)) . peekSTM + fmap fn = Awaitable . fmap (fmap (bimap (fmap fn) (fmap fn))) . peekSTM completedAwaitable :: Either SomeException r -> Awaitable r -completedAwaitable = Awaitable . pure . Just +completedAwaitable = Awaitable . pure . Just . Right successfulAwaitable :: r -> Awaitable r successfulAwaitable = completedAwaitable . Right @@ -68,19 +68,26 @@ failedAwaitable = completedAwaitable . Left peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r)) -peekAwaitable = liftIO . atomically . peekSTM +peekAwaitable input = liftIO $ go (toAwaitable input) + where + go :: Awaitable r -> IO (Maybe (Either SomeException r)) + go x = atomically (peekSTM x) >>= \case + Nothing -> pure Nothing + Just (Right result) -> pure $ Just result + Just (Left step) -> go step -awaitableFromSTM :: STM (Maybe (Either SomeException r)) -> IO (Awaitable r) -awaitableFromSTM fn = do - cache <- newTVarIO (Left fn) - pure . Awaitable $ - readTVar cache >>= \case - Left generatorFn -> do - value <- generatorFn - writeTVar cache (Right value) - pure value - Right value -> pure value +-- | Cache an `Awaitable` +--awaitableFromSTM :: STM (Maybe (Either SomeException r)) -> IO (Awaitable r) +--awaitableFromSTM fn = do +-- cache <- newTVarIO (Left fn) +-- pure . Awaitable $ +-- readTVar cache >>= \case +-- Left generatorFn -> do +-- value <- generatorFn +-- writeTVar cache (Right value) +-- pure value +-- Right value -> pure value @@ -90,7 +97,7 @@ awaitableFromSTM fn = do newtype AsyncVar r = AsyncVar (TMVar (Either SomeException r)) instance IsAwaitable r (AsyncVar r) where - peekSTM (AsyncVar var) = tryReadTMVar var + peekSTM (AsyncVar var) = fmap Right <$> tryReadTMVar var newAsyncVarSTM :: STM (AsyncVar r) @@ -124,3 +131,21 @@ putAsyncVarEither_ var = void . putAsyncVarEither var putAsyncVarEitherSTM_ :: AsyncVar a -> Either SomeException a -> STM () putAsyncVarEitherSTM_ var = void . putAsyncVarEitherSTM var + + + +-- * Awaiting multiple asyncs + +-- TODO +--awaitEither :: (IsAwaitable ra a , IsAwaitable rb b, MonadIO m) => a -> b -> m (Awaitable (Either ra rb)) +--awaitEither x y = liftIO $ awaitableFromSTM $ peekEitherSTM x y +-- +--peekEitherSTM :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> STM (Maybe (Either SomeException (Either ra rb))) +--peekEitherSTM x y = +-- peekSTM x >>= \case +-- Just (Left ex) -> pure (Just (Left ex)) +-- Just (Right r) -> pure (Just (Right (Left r))) +-- Nothing -> peekSTM y >>= \case +-- Just (Left ex) -> pure (Just (Left ex)) +-- Just (Right r) -> pure (Just (Right (Right r))) +-- Nothing -> pure Nothing diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index 0fd5b3a..42fa5f1 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -10,7 +10,6 @@ module Quasar.Core ( import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, myThreadId) import Control.Concurrent.STM -import Control.Exception (MaskingState(..), getMaskingState) import Control.Monad.Catch import Control.Monad.Reader import Data.HashSet @@ -65,12 +64,7 @@ runAsyncIO = withDefaultPool awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult = (await =<<) --- TODO rename --- AsyncIOPool --- AsyncPool --- ThreadPool --- AsyncIORuntime --- AsyncIOContext +-- TODO rename to ResourceManager data Pool = Pool { configuration :: PoolConfiguraiton, threads :: TVar (HashSet ThreadId) @@ -80,8 +74,14 @@ newtype AsyncTask r = AsyncTask (Awaitable r) instance IsAwaitable r (AsyncTask r) where toAwaitable (AsyncTask awaitable) = awaitable -data CancelTask -data CancelledTaskAwaited +data CancelTask = CancelTask + deriving stock Show +instance Exception CancelTask where + +data CancelledTask = CancelledTask + deriving stock Show +instance Exception CancelledTask where + data PoolConfiguraiton = PoolConfiguraiton @@ -109,26 +109,3 @@ newPool configuration = do disposePool :: Pool -> IO () -- TODO resource management disposePool = const (pure ()) - - - - - - --- * Awaiting multiple asyncs - -awaitEither :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> AsyncIO (Either ra rb) -awaitEither x y = await =<< liftIO (awaitEitherPlumbing x y) - -awaitEitherPlumbing :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> IO (Awaitable (Either ra rb)) -awaitEitherPlumbing x y = awaitableFromSTM $ peekEitherSTM x y - -peekEitherSTM :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> STM (Maybe (Either SomeException (Either ra rb))) -peekEitherSTM x y = - peekSTM x >>= \case - Just (Left ex) -> pure (Just (Left ex)) - Just (Right r) -> pure (Just (Right (Left r))) - Nothing -> peekSTM y >>= \case - Just (Left ex) -> pure (Just (Left ex)) - Just (Right r) -> pure (Just (Right (Right r))) - Nothing -> pure Nothing -- GitLab