Skip to content
Snippets Groups Projects
Commit 1549b0f8 authored by Jens Nolte's avatar Jens Nolte
Browse files

Change await to perform smaller STM transactions


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 461d4de5
No related branches found
No related tags found
No related merge requests found
...@@ -2,12 +2,10 @@ module Quasar.Awaitable ( ...@@ -2,12 +2,10 @@ module Quasar.Awaitable (
-- * Awaitable -- * Awaitable
IsAwaitable(..), IsAwaitable(..),
awaitIO, awaitIO,
awaitSTM,
Awaitable, Awaitable,
successfulAwaitable, successfulAwaitable,
failedAwaitable, failedAwaitable,
completedAwaitable, completedAwaitable,
awaitableFromSTM,
peekAwaitable, peekAwaitable,
-- * AsyncVar -- * AsyncVar
...@@ -26,11 +24,12 @@ module Quasar.Awaitable ( ...@@ -26,11 +24,12 @@ module Quasar.Awaitable (
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Monad.Catch import Control.Monad.Catch
import Data.Bifunctor (bimap)
import Quasar.Prelude import Quasar.Prelude
class IsAwaitable r a | a -> r where class IsAwaitable r a | a -> r where
peekSTM :: a -> STM (Maybe (Either SomeException r)) peekSTM :: a -> STM (Maybe (Either (Awaitable r) (Either SomeException r)))
peekSTM = peekSTM . toAwaitable peekSTM = peekSTM . toAwaitable
toAwaitable :: a -> Awaitable r toAwaitable :: a -> Awaitable r
...@@ -39,26 +38,27 @@ class IsAwaitable r a | a -> r where ...@@ -39,26 +38,27 @@ class IsAwaitable r a | a -> r where
{-# MINIMAL toAwaitable | peekSTM #-} {-# MINIMAL toAwaitable | peekSTM #-}
-- | Wait until the promise is settled and return the result.
awaitSTM :: IsAwaitable r a => a -> STM (Either SomeException r)
awaitSTM = peekSTM >=> maybe retry pure
awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r
awaitIO action = liftIO $ either throwIO pure =<< atomically (awaitSTM action) awaitIO input = liftIO $ either throwIO pure =<< go (toAwaitable input)
where
go :: Awaitable r -> IO (Either SomeException r)
go x = do
stepResult <- atomically $ maybe retry pure =<< peekSTM x
either go pure stepResult
newtype Awaitable r = Awaitable (STM (Maybe (Either SomeException r))) newtype Awaitable r = Awaitable (STM (Maybe (Either (Awaitable r) (Either SomeException r))))
instance IsAwaitable r (Awaitable r) where instance IsAwaitable r (Awaitable r) where
peekSTM (Awaitable x) = x peekSTM (Awaitable x) = x
toAwaitable = id toAwaitable = id
instance Functor Awaitable where instance Functor Awaitable where
fmap fn = Awaitable . fmap (fmap (fmap fn)) . peekSTM fmap fn = Awaitable . fmap (fmap (bimap (fmap fn) (fmap fn))) . peekSTM
completedAwaitable :: Either SomeException r -> Awaitable r completedAwaitable :: Either SomeException r -> Awaitable r
completedAwaitable = Awaitable . pure . Just completedAwaitable = Awaitable . pure . Just . Right
successfulAwaitable :: r -> Awaitable r successfulAwaitable :: r -> Awaitable r
successfulAwaitable = completedAwaitable . Right successfulAwaitable = completedAwaitable . Right
...@@ -68,19 +68,26 @@ failedAwaitable = completedAwaitable . Left ...@@ -68,19 +68,26 @@ failedAwaitable = completedAwaitable . Left
peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r)) peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r))
peekAwaitable = liftIO . atomically . peekSTM peekAwaitable input = liftIO $ go (toAwaitable input)
where
go :: Awaitable r -> IO (Maybe (Either SomeException r))
go x = atomically (peekSTM x) >>= \case
Nothing -> pure Nothing
Just (Right result) -> pure $ Just result
Just (Left step) -> go step
awaitableFromSTM :: STM (Maybe (Either SomeException r)) -> IO (Awaitable r) -- | Cache an `Awaitable`
awaitableFromSTM fn = do --awaitableFromSTM :: STM (Maybe (Either SomeException r)) -> IO (Awaitable r)
cache <- newTVarIO (Left fn) --awaitableFromSTM fn = do
pure . Awaitable $ -- cache <- newTVarIO (Left fn)
readTVar cache >>= \case -- pure . Awaitable $
Left generatorFn -> do -- readTVar cache >>= \case
value <- generatorFn -- Left generatorFn -> do
writeTVar cache (Right value) -- value <- generatorFn
pure value -- writeTVar cache (Right value)
Right value -> pure value -- pure value
-- Right value -> pure value
...@@ -90,7 +97,7 @@ awaitableFromSTM fn = do ...@@ -90,7 +97,7 @@ awaitableFromSTM fn = do
newtype AsyncVar r = AsyncVar (TMVar (Either SomeException r)) newtype AsyncVar r = AsyncVar (TMVar (Either SomeException r))
instance IsAwaitable r (AsyncVar r) where instance IsAwaitable r (AsyncVar r) where
peekSTM (AsyncVar var) = tryReadTMVar var peekSTM (AsyncVar var) = fmap Right <$> tryReadTMVar var
newAsyncVarSTM :: STM (AsyncVar r) newAsyncVarSTM :: STM (AsyncVar r)
...@@ -124,3 +131,21 @@ putAsyncVarEither_ var = void . putAsyncVarEither var ...@@ -124,3 +131,21 @@ putAsyncVarEither_ var = void . putAsyncVarEither var
putAsyncVarEitherSTM_ :: AsyncVar a -> Either SomeException a -> STM () putAsyncVarEitherSTM_ :: AsyncVar a -> Either SomeException a -> STM ()
putAsyncVarEitherSTM_ var = void . putAsyncVarEitherSTM var putAsyncVarEitherSTM_ var = void . putAsyncVarEitherSTM var
-- * Awaiting multiple asyncs
-- TODO
--awaitEither :: (IsAwaitable ra a , IsAwaitable rb b, MonadIO m) => a -> b -> m (Awaitable (Either ra rb))
--awaitEither x y = liftIO $ awaitableFromSTM $ peekEitherSTM x y
--
--peekEitherSTM :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> STM (Maybe (Either SomeException (Either ra rb)))
--peekEitherSTM x y =
-- peekSTM x >>= \case
-- Just (Left ex) -> pure (Just (Left ex))
-- Just (Right r) -> pure (Just (Right (Left r)))
-- Nothing -> peekSTM y >>= \case
-- Just (Left ex) -> pure (Just (Left ex))
-- Just (Right r) -> pure (Just (Right (Right r)))
-- Nothing -> pure Nothing
...@@ -10,7 +10,6 @@ module Quasar.Core ( ...@@ -10,7 +10,6 @@ module Quasar.Core (
import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, myThreadId) import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, myThreadId)
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception (MaskingState(..), getMaskingState)
import Control.Monad.Catch import Control.Monad.Catch
import Control.Monad.Reader import Control.Monad.Reader
import Data.HashSet import Data.HashSet
...@@ -65,12 +64,7 @@ runAsyncIO = withDefaultPool ...@@ -65,12 +64,7 @@ runAsyncIO = withDefaultPool
awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r
awaitResult = (await =<<) awaitResult = (await =<<)
-- TODO rename -- TODO rename to ResourceManager
-- AsyncIOPool
-- AsyncPool
-- ThreadPool
-- AsyncIORuntime
-- AsyncIOContext
data Pool = Pool { data Pool = Pool {
configuration :: PoolConfiguraiton, configuration :: PoolConfiguraiton,
threads :: TVar (HashSet ThreadId) threads :: TVar (HashSet ThreadId)
...@@ -80,8 +74,14 @@ newtype AsyncTask r = AsyncTask (Awaitable r) ...@@ -80,8 +74,14 @@ newtype AsyncTask r = AsyncTask (Awaitable r)
instance IsAwaitable r (AsyncTask r) where instance IsAwaitable r (AsyncTask r) where
toAwaitable (AsyncTask awaitable) = awaitable toAwaitable (AsyncTask awaitable) = awaitable
data CancelTask data CancelTask = CancelTask
data CancelledTaskAwaited deriving stock Show
instance Exception CancelTask where
data CancelledTask = CancelledTask
deriving stock Show
instance Exception CancelledTask where
data PoolConfiguraiton = PoolConfiguraiton data PoolConfiguraiton = PoolConfiguraiton
...@@ -109,26 +109,3 @@ newPool configuration = do ...@@ -109,26 +109,3 @@ newPool configuration = do
disposePool :: Pool -> IO () disposePool :: Pool -> IO ()
-- TODO resource management -- TODO resource management
disposePool = const (pure ()) disposePool = const (pure ())
-- * Awaiting multiple asyncs
awaitEither :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> AsyncIO (Either ra rb)
awaitEither x y = await =<< liftIO (awaitEitherPlumbing x y)
awaitEitherPlumbing :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> IO (Awaitable (Either ra rb))
awaitEitherPlumbing x y = awaitableFromSTM $ peekEitherSTM x y
peekEitherSTM :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> STM (Maybe (Either SomeException (Either ra rb)))
peekEitherSTM x y =
peekSTM x >>= \case
Just (Left ex) -> pure (Just (Left ex))
Just (Right r) -> pure (Just (Right (Left r)))
Nothing -> peekSTM y >>= \case
Just (Left ex) -> pure (Just (Left ex))
Just (Right r) -> pure (Just (Right (Right r)))
Nothing -> pure Nothing
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment