Skip to content
Snippets Groups Projects
Commit 9700867f authored by Jens Nolte's avatar Jens Nolte
Browse files

Add Monad instance to Awaitable with support for per-step caching

parent 1549b0f8
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ module Quasar.Awaitable (
failedAwaitable,
completedAwaitable,
peekAwaitable,
awaitEither,
-- * AsyncVar
AsyncVar,
......@@ -24,41 +25,55 @@ module Quasar.Awaitable (
import Control.Concurrent.STM
import Control.Monad.Catch
import Control.Monad.Fix (mfix)
import Control.Monad.Reader
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import Data.Bifunctor (bimap)
import Quasar.Prelude
class IsAwaitable r a | a -> r where
peekSTM :: a -> STM (Maybe (Either (Awaitable r) (Either SomeException r)))
peekSTM = peekSTM . toAwaitable
runAwaitable :: (Monad m) => a -> (forall b. STM (Maybe b) -> m b) -> m (Either SomeException r)
runAwaitable self = runAwaitable (toAwaitable self)
toAwaitable :: a -> Awaitable r
toAwaitable x = Awaitable (peekSTM x)
toAwaitable x = Awaitable $ runAwaitable x
{-# MINIMAL toAwaitable | peekSTM #-}
{-# MINIMAL toAwaitable | runAwaitable #-}
awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r
awaitIO input = liftIO $ either throwIO pure =<< go (toAwaitable input)
where
go :: Awaitable r -> IO (Either SomeException r)
go x = do
stepResult <- atomically $ maybe retry pure =<< peekSTM x
either go pure stepResult
awaitIO awaitable = liftIO $ either throwIO pure =<< runAwaitable awaitable (atomically . (maybe retry pure =<<))
peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r))
peekAwaitable awaitable = liftIO . runMaybeT $ runAwaitable awaitable (MaybeT . atomically)
newtype Awaitable r = Awaitable (STM (Maybe (Either (Awaitable r) (Either SomeException r))))
newtype Awaitable r = Awaitable (forall m. (Monad m) => (forall b. STM (Maybe b) -> m b) -> m (Either SomeException r))
instance IsAwaitable r (Awaitable r) where
peekSTM (Awaitable x) = x
runAwaitable (Awaitable x) = x
toAwaitable = id
instance Functor Awaitable where
fmap fn = Awaitable . fmap (fmap (bimap (fmap fn) (fmap fn))) . peekSTM
fmap fn (Awaitable x) = Awaitable $ \querySTM -> fn <<$>> x querySTM
instance Applicative Awaitable where
pure value = Awaitable $ \_ -> pure (Right value)
liftA2 fn (Awaitable fx) (Awaitable fy) = Awaitable $ \querySTM -> liftA2 (liftA2 fn) (fx querySTM) (fy querySTM)
instance Monad Awaitable where
(Awaitable fx) >>= fn = Awaitable $ \querySTM -> do
fx querySTM >>= \case
Left ex -> pure $ Left ex
Right x -> runAwaitable (fn x) querySTM
completedAwaitable :: Either SomeException r -> Awaitable r
completedAwaitable = Awaitable . pure . Just . Right
completedAwaitable result = Awaitable $ \_ -> pure result
successfulAwaitable :: r -> Awaitable r
successfulAwaitable = completedAwaitable . Right
......@@ -66,29 +81,73 @@ successfulAwaitable = completedAwaitable . Right
failedAwaitable :: SomeException -> Awaitable r
failedAwaitable = completedAwaitable . Left
simpleAwaitable :: STM (Maybe (Either SomeException a)) -> Awaitable a
simpleAwaitable peekTransaction = Awaitable ($ peekTransaction)
peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r))
peekAwaitable input = liftIO $ go (toAwaitable input)
where
go :: Awaitable r -> IO (Maybe (Either SomeException r))
go x = atomically (peekSTM x) >>= \case
Nothing -> pure Nothing
Just (Right result) -> pure $ Just result
Just (Left step) -> go step
class Monad m => MonadQuerySTM m where
querySTM :: (forall a. STM (Maybe a) -> m a)
instance Monad m => MonadQuerySTM (ReaderT (QuerySTMFunction m) m) where
querySTM query = do
QuerySTMFunction querySTMFn <- ask
lift $ querySTMFn query
data QuerySTMFunction m = QuerySTMFunction (forall b. STM (Maybe b) -> m b)
-- | Cache an `Awaitable`
--awaitableFromSTM :: STM (Maybe (Either SomeException r)) -> IO (Awaitable r)
--awaitableFromSTM fn = do
-- cache <- newTVarIO (Left fn)
-- pure . Awaitable $
-- readTVar cache >>= \case
-- Left generatorFn -> do
-- value <- generatorFn
-- writeTVar cache (Right value)
-- pure value
-- Right value -> pure value
newtype CachedAwaitable r = CachedAwaitable (TVar (AwaitableStepM (Either SomeException r)))
instance IsAwaitable r (CachedAwaitable r) where
runAwaitable :: forall m. (Monad m) => CachedAwaitable r -> (forall b. STM (Maybe b) -> m b) -> m (Either SomeException r)
runAwaitable (CachedAwaitable tvar) querySTM = go
where
go :: m (Either SomeException r)
go = querySTM stepCacheTransaction >>= \case
AwaitableCompleted result -> pure result
-- Cached operation is not yet completed
_ -> go
stepCacheTransaction :: STM (Maybe (AwaitableStepM (Either SomeException r)))
stepCacheTransaction = do
readTVar tvar >>= \case
-- Cache was already completed
result@(AwaitableCompleted _) -> pure $ Just result
AwaitableStep transaction fn -> do
-- Run the next "querySTM" transaction requested by the cached operation
fn <<$>> transaction >>= \case
-- In case of an incomplete transaction the caller (/ the monad `m`) can decide what to do (e.g. retry for `awaitIO`, abort for `peekAwaitable`)
Nothing -> pure Nothing
-- Query was successful. Update cache and exit transaction
Just nextStep -> do
writeTVar tvar nextStep
pure $ Just nextStep
cacheAwaitable :: Awaitable a -> IO (CachedAwaitable a)
cacheAwaitable awaitable = CachedAwaitable <$> newTVarIO (peekM awaitable peekStep)
data AwaitableStepM a
= AwaitableCompleted a
| forall b. AwaitableStep (STM (Maybe b)) (b -> AwaitableStepM a)
instance Functor AwaitableStepM where
fmap fn (AwaitableCompleted x) = AwaitableCompleted (fn x)
fmap fn (AwaitableStep transaction next) = AwaitableStep transaction (fmap fn <$> next)
instance Applicative AwaitableStepM where
pure = AwaitableCompleted
liftA2 fn fx fy = fx >>= \x -> fn x <$> fy
instance Monad AwaitableStepM where
(AwaitableCompleted x) >>= fn = fn x
(AwaitableStep transaction next) >>= fn = AwaitableStep transaction (next >=> fn)
instance MonadQuerySTM AwaitableStepM where
querySTM transaction = AwaitableStep transaction AwaitableCompleted
peekStep :: STM (Maybe a) -> AwaitableStepM a
peekStep transaction = AwaitableStep transaction AwaitableCompleted
-- ** AsyncVar
......@@ -97,7 +156,7 @@ peekAwaitable input = liftIO $ go (toAwaitable input)
newtype AsyncVar r = AsyncVar (TMVar (Either SomeException r))
instance IsAwaitable r (AsyncVar r) where
peekSTM (AsyncVar var) = fmap Right <$> tryReadTMVar var
runAwaitable (AsyncVar var) = ($ tryReadTMVar var)
newAsyncVarSTM :: STM (AsyncVar r)
......@@ -136,16 +195,29 @@ putAsyncVarEitherSTM_ var = void . putAsyncVarEitherSTM var
-- * Awaiting multiple asyncs
-- TODO
--awaitEither :: (IsAwaitable ra a , IsAwaitable rb b, MonadIO m) => a -> b -> m (Awaitable (Either ra rb))
--awaitEither x y = liftIO $ awaitableFromSTM $ peekEitherSTM x y
--
--peekEitherSTM :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> STM (Maybe (Either SomeException (Either ra rb)))
--peekEitherSTM x y =
-- peekSTM x >>= \case
-- Just (Left ex) -> pure (Just (Left ex))
-- Just (Right r) -> pure (Just (Right (Left r)))
-- Nothing -> peekSTM y >>= \case
-- Just (Left ex) -> pure (Just (Left ex))
-- Just (Right r) -> pure (Just (Right (Right r)))
-- Nothing -> pure Nothing
awaitEither :: (IsAwaitable ra a , IsAwaitable rb b, MonadIO m) => a -> b -> m (Awaitable (Either ra rb))
awaitEither x y = liftIO $ do
let startX = runAwaitable x peekStep
let startY = runAwaitable y peekStep
pure $ Awaitable $ \querySTM -> groupLefts <$> stepBoth startX startY querySTM
where
stepBoth :: Monad m => AwaitableStepM ra -> AwaitableStepM rb -> (forall c. STM (Maybe c) -> m c) -> m (Either ra rb)
stepBoth (AwaitableCompleted resultX) _ _ = pure $ Left resultX
stepBoth _ (AwaitableCompleted resultY) _ = pure $ Right resultY
stepBoth stepX@(AwaitableStep transactionX nextX) stepY@(AwaitableStep transactionY nextY) querySTM = do
querySTM (peekEitherSTM transactionX transactionY) >>= \case
Left resultX -> stepBoth (nextX resultX) stepY querySTM
Right resultY -> stepBoth stepX (nextY resultY) querySTM
groupLefts :: Either (Either ex a) (Either ex b) -> Either ex (Either a b)
groupLefts (Left x) = Left <$> x
groupLefts (Right y) = Right <$> y
peekEitherSTM :: STM (Maybe a) -> STM (Maybe b) -> STM (Maybe (Either a b))
peekEitherSTM x y =
x >>= \case
Just r -> pure (Just (Left r))
Nothing -> y >>= \case
Just r -> pure (Just (Right r))
Nothing -> pure Nothing
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment