Commit 7167f4ce
Simplify runAwaitable implementations by requiring MonadQuerySTM class

parent 9700867f
......@@ -2,11 +2,14 @@ module Quasar.Awaitable (
-- * Awaitable
-- * Awaiting multiple awaitables
-- * AsyncVar
......@@ -35,7 +38,7 @@ import Quasar.Prelude
class IsAwaitable r a | a -> r where
runAwaitable :: (Monad m) => a -> (forall b. STM (Maybe b) -> m b) -> m (Either SomeException r)
runAwaitable :: (MonadQuerySTM m) => a -> m (Either SomeException r)
runAwaitable self = runAwaitable (toAwaitable self)
toAwaitable :: a -> Awaitable r
......@@ -45,35 +48,35 @@ class IsAwaitable r a | a -> r where
awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r
awaitIO awaitable = liftIO $ either throwIO pure =<< runAwaitable awaitable (atomically . (maybe retry pure =<<))
awaitIO awaitable = liftIO $ either throwIO pure =<< runQueryT (atomically . (maybe retry pure =<<)) (runAwaitable awaitable)
peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r))
peekAwaitable awaitable = liftIO . runMaybeT $ runAwaitable awaitable (MaybeT . atomically)
peekAwaitable awaitable = liftIO $ runMaybeT $ runQueryT (MaybeT . atomically) (runAwaitable awaitable)
newtype Awaitable r = Awaitable (forall m. (Monad m) => (forall b. STM (Maybe b) -> m b) -> m (Either SomeException r))
newtype Awaitable r = Awaitable (forall m. (MonadQuerySTM m) => m (Either SomeException r))
instance IsAwaitable r (Awaitable r) where
runAwaitable (Awaitable x) = x
toAwaitable = id
instance Functor Awaitable where
fmap fn (Awaitable x) = Awaitable $ \querySTM -> fn <<$>> x querySTM
fmap fn (Awaitable x) = Awaitable $ fn <<$>> x
instance Applicative Awaitable where
pure value = Awaitable $ \_ -> pure (Right value)
liftA2 fn (Awaitable fx) (Awaitable fy) = Awaitable $ \querySTM -> liftA2 (liftA2 fn) (fx querySTM) (fy querySTM)
pure value = Awaitable $ pure (Right value)
liftA2 fn (Awaitable fx) (Awaitable fy) = Awaitable $ liftA2 (liftA2 fn) fx fy
instance Monad Awaitable where
(Awaitable fx) >>= fn = Awaitable $ \querySTM -> do
fx querySTM >>= \case
(Awaitable fx) >>= fn = Awaitable $ do
fx >>= \case
Left ex -> pure $ Left ex
Right x -> runAwaitable (fn x) querySTM
Right x -> runAwaitable (fn x)
completedAwaitable :: Either SomeException r -> Awaitable r
completedAwaitable result = Awaitable $ \_ -> pure result
completedAwaitable result = Awaitable $ pure result
successfulAwaitable :: r -> Awaitable r
successfulAwaitable = completedAwaitable . Right
......@@ -82,25 +85,31 @@ failedAwaitable :: SomeException -> Awaitable r
failedAwaitable = completedAwaitable . Left
simpleAwaitable :: STM (Maybe (Either SomeException a)) -> Awaitable a
simpleAwaitable peekTransaction = Awaitable ($ peekTransaction)
simpleAwaitable query = Awaitable (querySTM query)
class Monad m => MonadQuerySTM m where
querySTM :: (forall a. STM (Maybe a) -> m a)
instance Monad m => MonadQuerySTM (ReaderT (QuerySTMFunction m) m) where
instance Monad m => MonadQuerySTM (ReaderT (QueryFn m) m) where
querySTM query = do
QuerySTMFunction querySTMFn <- ask
QueryFn querySTMFn <- ask
lift $ querySTMFn query
data QuerySTMFunction m = QuerySTMFunction (forall b. STM (Maybe b) -> m b)
data QueryFn m = QueryFn (forall a. STM (Maybe a) -> m a)
runQueryT :: forall m a. Monad m => (forall b. STM (Maybe b) -> m b) -> ReaderT (QueryFn m) m a -> m a
runQueryT queryFn action = runReaderT action (QueryFn queryFn)
newtype CachedAwaitable r = CachedAwaitable (TVar (AwaitableStepM (Either SomeException r)))
cacheAwaitable :: Awaitable a -> IO (CachedAwaitable a)
cacheAwaitable awaitable = CachedAwaitable <$> newTVarIO (runAwaitable awaitable)
instance IsAwaitable r (CachedAwaitable r) where
runAwaitable :: forall m. (Monad m) => CachedAwaitable r -> (forall b. STM (Maybe b) -> m b) -> m (Either SomeException r)
runAwaitable (CachedAwaitable tvar) querySTM = go
runAwaitable :: forall m. (MonadQuerySTM m) => CachedAwaitable r -> m (Either SomeException r)
runAwaitable (CachedAwaitable tvar) = go
go :: m (Either SomeException r)
go = querySTM stepCacheTransaction >>= \case
......@@ -113,26 +122,23 @@ instance IsAwaitable r (CachedAwaitable r) where
readTVar tvar >>= \case
-- Cache was already completed
result@(AwaitableCompleted _) -> pure $ Just result
AwaitableStep transaction fn -> do
-- Run the next "querySTM" transaction requested by the cached operation
fn <<$>> transaction >>= \case
-- In case of an incomplete transaction the caller (/ the monad `m`) can decide what to do (e.g. retry for `awaitIO`, abort for `peekAwaitable`)
AwaitableStep query fn -> do
-- Run the next "querySTM" query requested by the cached operation
fn <<$>> query >>= \case
-- In case of an incomplete query the caller (/ the monad `m`) can decide what to do (e.g. retry for `awaitIO`, abort for `peekAwaitable`)
Nothing -> pure Nothing
-- Query was successful. Update cache and exit transaction
-- Query was successful. Update cache and exit query
Just nextStep -> do
writeTVar tvar nextStep
pure $ Just nextStep
cacheAwaitable :: Awaitable a -> IO (CachedAwaitable a)
cacheAwaitable awaitable = CachedAwaitable <$> newTVarIO (peekM awaitable peekStep)
data AwaitableStepM a
= AwaitableCompleted a
| forall b. AwaitableStep (STM (Maybe b)) (b -> AwaitableStepM a)
instance Functor AwaitableStepM where
fmap fn (AwaitableCompleted x) = AwaitableCompleted (fn x)
fmap fn (AwaitableStep transaction next) = AwaitableStep transaction (fmap fn <$> next)
fmap fn (AwaitableStep query next) = AwaitableStep query (fmap fn <$> next)
instance Applicative AwaitableStepM where
pure = AwaitableCompleted
......@@ -140,14 +146,10 @@ instance Applicative AwaitableStepM where
instance Monad AwaitableStepM where
(AwaitableCompleted x) >>= fn = fn x
(AwaitableStep transaction next) >>= fn = AwaitableStep transaction (next >=> fn)
(AwaitableStep query next) >>= fn = AwaitableStep query (next >=> fn)
instance MonadQuerySTM AwaitableStepM where
querySTM transaction = AwaitableStep transaction AwaitableCompleted
peekStep :: STM (Maybe a) -> AwaitableStepM a
peekStep transaction = AwaitableStep transaction AwaitableCompleted
querySTM query = AwaitableStep query AwaitableCompleted
-- ** AsyncVar
......@@ -156,7 +158,7 @@ peekStep transaction = AwaitableStep transaction AwaitableCompleted
newtype AsyncVar r = AsyncVar (TMVar (Either SomeException r))
instance IsAwaitable r (AsyncVar r) where
runAwaitable (AsyncVar var) = ($ tryReadTMVar var)
runAwaitable (AsyncVar var) = querySTM $ tryReadTMVar var
newAsyncVarSTM :: STM (AsyncVar r)
......@@ -197,17 +199,17 @@ putAsyncVarEitherSTM_ var = void . putAsyncVarEitherSTM var
awaitEither :: (IsAwaitable ra a , IsAwaitable rb b, MonadIO m) => a -> b -> m (Awaitable (Either ra rb))
awaitEither x y = liftIO $ do
let startX = runAwaitable x peekStep
let startY = runAwaitable y peekStep
pure $ Awaitable $ \querySTM -> groupLefts <$> stepBoth startX startY querySTM
let startX = runAwaitable x
let startY = runAwaitable y
pure $ Awaitable $ groupLefts <$> stepBoth startX startY
stepBoth :: Monad m => AwaitableStepM ra -> AwaitableStepM rb -> (forall c. STM (Maybe c) -> m c) -> m (Either ra rb)
stepBoth (AwaitableCompleted resultX) _ _ = pure $ Left resultX
stepBoth _ (AwaitableCompleted resultY) _ = pure $ Right resultY
stepBoth stepX@(AwaitableStep transactionX nextX) stepY@(AwaitableStep transactionY nextY) querySTM = do
stepBoth :: MonadQuerySTM m => AwaitableStepM ra -> AwaitableStepM rb -> m (Either ra rb)
stepBoth (AwaitableCompleted resultX) _ = pure $ Left resultX
stepBoth _ (AwaitableCompleted resultY) = pure $ Right resultY
stepBoth stepX@(AwaitableStep transactionX nextX) stepY@(AwaitableStep transactionY nextY) = do
querySTM (peekEitherSTM transactionX transactionY) >>= \case
Left resultX -> stepBoth (nextX resultX) stepY querySTM
Right resultY -> stepBoth stepX (nextY resultY) querySTM
Left resultX -> stepBoth (nextX resultX) stepY
Right resultY -> stepBoth stepX (nextY resultY)
groupLefts :: Either (Either ex a) (Either ex b) -> Either ex (Either a b)
