diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs index e9ee83601f4eed23f66c34e46acdb47117bcb3a7..97eafd3e806ad1cdc828e6d5f08fe988ec0e2a98 100644 --- a/src/Quasar/Awaitable.hs +++ b/src/Quasar/Awaitable.hs @@ -12,7 +12,6 @@ module Quasar.Awaitable ( mapAwaitable, -- * Awaiting multiple awaitables - cacheAwaitable, awaitEither, awaitAny, @@ -28,6 +27,9 @@ module Quasar.Awaitable ( failAsyncVar_, putAsyncVarEither_, putAsyncVarEitherSTM_, + + -- * Implementation helpers + cacheAwaitableDefaultImplementation, ) where import Control.Concurrent.STM @@ -44,10 +46,14 @@ class IsAwaitable r a | a -> r where runAwaitable :: (MonadQuerySTM m) => a -> m (Either SomeException r) runAwaitable self = runAwaitable (toAwaitable self) + cacheAwaitable :: MonadIO m => a -> m (Awaitable r) + cacheAwaitable self = cacheAwaitable (toAwaitable self) + toAwaitable :: a -> Awaitable r - toAwaitable x = Awaitable $ runAwaitable x + toAwaitable = Awaitable + - {-# MINIMAL toAwaitable | runAwaitable #-} + {-# MINIMAL toAwaitable | (runAwaitable, cacheAwaitable) #-} awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r @@ -57,22 +63,23 @@ peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeExcep peekAwaitable awaitable = liftIO $ runMaybeT $ runQueryT (MaybeT . atomically) (runAwaitable awaitable) -newtype Awaitable r = Awaitable (forall m. (MonadQuerySTM m) => m (Either SomeException r)) +data Awaitable r = forall a. IsAwaitable r a => Awaitable a instance IsAwaitable r (Awaitable r) where - runAwaitable (Awaitable x) = x + runAwaitable (Awaitable x) = runAwaitable x + cacheAwaitable (Awaitable x) = cacheAwaitable x toAwaitable = id instance Functor Awaitable where - fmap fn (Awaitable x) = Awaitable $ fn <<$>> x + fmap fn (Awaitable x) = toAwaitable $ FnAwaitable $ fn <<$>> runAwaitable x instance Applicative Awaitable where - pure value = Awaitable $ pure (Right value) - liftA2 fn (Awaitable fx) (Awaitable fy) = Awaitable $ liftA2 (liftA2 fn) fx fy + pure value = toAwaitable $ FnAwaitable $ pure (Right value) + liftA2 fn (Awaitable fx) (Awaitable fy) = toAwaitable $ FnAwaitable $ liftA2 (liftA2 fn) (runAwaitable fx) (runAwaitable fy) instance Monad Awaitable where - (Awaitable fx) >>= fn = Awaitable $ do - fx >>= \case + (Awaitable fx) >>= fn = toAwaitable $ FnAwaitable $ do + runAwaitable fx >>= \case Left ex -> pure $ Left ex Right x -> runAwaitable (fn x) @@ -83,9 +90,22 @@ instance Monoid r => Monoid (Awaitable r) where mempty = pure mempty +newtype FnAwaitable r = FnAwaitable (forall m. (MonadQuerySTM m) => m (Either SomeException r)) + +instance IsAwaitable r (FnAwaitable r) where + runAwaitable (FnAwaitable x) = x + cacheAwaitable = cacheAwaitableDefaultImplementation + + +newtype CompletedAwaitable r = CompletedAwaitable (Either SomeException r) + +instance IsAwaitable r (CompletedAwaitable r) where + runAwaitable (CompletedAwaitable x) = pure x + cacheAwaitable = pure . toAwaitable + completedAwaitable :: Either SomeException r -> Awaitable r -completedAwaitable result = Awaitable $ pure result +completedAwaitable result = toAwaitable $ CompletedAwaitable result successfulAwaitable :: r -> Awaitable r successfulAwaitable = completedAwaitable . Right @@ -94,10 +114,10 @@ failedAwaitable :: SomeException -> Awaitable r failedAwaitable = completedAwaitable . Left simpleAwaitable :: STM (Maybe (Either SomeException a)) -> Awaitable a -simpleAwaitable query = Awaitable (querySTM query) +simpleAwaitable query = toAwaitable $ FnAwaitable $ querySTM query mapAwaitable :: IsAwaitable i a => (Either SomeException i -> Either SomeException r) -> a -> Awaitable r -mapAwaitable fn awaitable = Awaitable $ fn <$> runAwaitable awaitable +mapAwaitable fn awaitable = toAwaitable $ FnAwaitable $ fn <$> runAwaitable awaitable class Monad m => MonadQuerySTM m where @@ -116,8 +136,8 @@ runQueryT queryFn action = runReaderT action (QueryFn queryFn) newtype CachedAwaitable r = CachedAwaitable (TVar (AwaitableStepM (Either SomeException r))) -cacheAwaitable :: Awaitable a -> IO (CachedAwaitable a) -cacheAwaitable awaitable = CachedAwaitable <$> newTVarIO (runAwaitable awaitable) +cacheAwaitableDefaultImplementation :: (IsAwaitable r a, MonadIO m) => a -> m (Awaitable r) +cacheAwaitableDefaultImplementation awaitable = toAwaitable . CachedAwaitable <$> liftIO (newTVarIO (runAwaitable awaitable)) instance IsAwaitable r (CachedAwaitable r) where runAwaitable :: forall m. (MonadQuerySTM m) => CachedAwaitable r -> m (Either SomeException r) @@ -137,13 +157,16 @@ instance IsAwaitable r (CachedAwaitable r) where AwaitableStep query fn -> do -- Run the next "querySTM" query requested by the cached operation fn <<$>> query >>= \case - -- In case of an incomplete query the caller (/ the monad `m`) can decide what to do (e.g. retry for `awaitIO`, abort for `peekAwaitable`) + -- In case of an incomplete query the caller (/ the monad `m`) can decide what to do (e.g. retry for + -- `awaitIO`, abort for `peekAwaitable`) Nothing -> pure Nothing -- Query was successful. Update cache and exit query Just nextStep -> do writeTVar tvar nextStep pure $ Just nextStep + cacheAwaitable = pure . toAwaitable + data AwaitableStepM a = AwaitableCompleted a | forall b. AwaitableStep (STM (Maybe b)) (b -> AwaitableStepM a) @@ -171,6 +194,7 @@ newtype AsyncVar r = AsyncVar (TMVar (Either SomeException r)) instance IsAwaitable r (AsyncVar r) where runAwaitable (AsyncVar var) = querySTM $ tryReadTMVar var + cacheAwaitable = pure . toAwaitable newAsyncVarSTM :: STM (AsyncVar r) @@ -209,8 +233,8 @@ putAsyncVarEitherSTM_ var = void . putAsyncVarEitherSTM var -- * Awaiting multiple asyncs -awaitEither :: (IsAwaitable ra a , IsAwaitable rb b, MonadIO m) => a -> b -> m (Awaitable (Either ra rb)) -awaitEither x y = pure $ Awaitable $ groupLefts <$> stepBoth (runAwaitable x) (runAwaitable y) +awaitEither :: (IsAwaitable ra a, IsAwaitable rb b) => a -> b -> Awaitable (Either ra rb) +awaitEither x y = toAwaitable $ FnAwaitable $ groupLefts <$> stepBoth (runAwaitable x) (runAwaitable y) where stepBoth :: MonadQuerySTM m => AwaitableStepM ra -> AwaitableStepM rb -> m (Either ra rb) stepBoth (AwaitableCompleted resultX) _ = pure $ Left resultX @@ -221,8 +245,8 @@ awaitEither x y = pure $ Awaitable $ groupLefts <$> stepBoth (runAwaitable x) (r Right resultY -> stepBoth stepX (nextY resultY) -awaitAny :: (IsAwaitable r a, MonadIO m) => NonEmpty a -> m (Awaitable r) -awaitAny xs = pure $ Awaitable $ stepAll Empty Empty $ runAwaitable <$> fromList (toList xs) +awaitAny :: IsAwaitable r a => NonEmpty a -> Awaitable r +awaitAny xs = toAwaitable $ FnAwaitable $ stepAll Empty Empty $ runAwaitable <$> fromList (toList xs) where stepAll :: MonadQuerySTM m @@ -253,7 +277,6 @@ peekEitherSTM x y = Just r -> pure (Just (Right r)) Nothing -> pure Nothing - peekAnySTM :: NonEmpty (STM (Maybe a)) -> STM (Maybe a) peekAnySTM (x :| xs) = x >>= \case r@(Just _) -> pure r diff --git a/src/Quasar/Disposable.hs b/src/Quasar/Disposable.hs index 25c768090a2633520651a044745f45d90a4ea95c..29d929124c91282f01e14f0ba839e12bdd1078d8 100644 --- a/src/Quasar/Disposable.hs +++ b/src/Quasar/Disposable.hs @@ -91,6 +91,8 @@ instance IsAwaitable () FnDisposable where -- Query if dispose is completed runAwaitable awaitable + cacheAwaitable = cacheAwaitableDefaultImplementation + data CombinedDisposable = CombinedDisposable Disposable Disposable