diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs index cecbcd7d62cb10fa8d6a92a2b6c6524448a551b8..47276f4f73a895f50c1b7e03c797429f31e21dd5 100644 --- a/src/Quasar/Awaitable.hs +++ b/src/Quasar/Awaitable.hs @@ -59,10 +59,12 @@ class IsAwaitable r a | a -> r where awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r -awaitIO awaitable = liftIO $ either throwIO pure =<< tryAwaitIO awaitable +awaitIO awaitable = liftIO $ either throwIO pure =<< + runQueryT (atomically . (maybe retry pure =<<)) (runAwaitable awaitable) tryAwaitIO :: (IsAwaitable r a, MonadIO m) => a -> m (Either SomeException r) -tryAwaitIO awaitable = liftIO $ runQueryT (atomically . (maybe retry pure =<<)) (runAwaitable awaitable) +tryAwaitIO awaitable = liftIO $ fmap join $ try $ + runQueryT (atomically . (maybe retry pure =<<)) (runAwaitable awaitable) peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r)) peekAwaitable awaitable = liftIO $ runMaybeT $ runQueryT (MaybeT . atomically) (runAwaitable awaitable) @@ -151,7 +153,7 @@ mapAwaitable :: IsAwaitable i a => (Either SomeException i -> Either SomeExcepti mapAwaitable fn awaitable = toAwaitable $ FnAwaitable $ fn <$> runAwaitable awaitable -class Monad m => MonadQuerySTM m where +class MonadThrow m => MonadQuerySTM m where -- | Run an `STM` transaction. `retry` can be used. querySTM :: (forall a. STM a -> m a) querySTM transaction = unsafeQuerySTM $ (Just <$> transaction) `orElse` pure Nothing @@ -162,7 +164,7 @@ class Monad m => MonadQuerySTM m where {-# MINIMAL querySTM | unsafeQuerySTM #-} -instance Monad m => MonadQuerySTM (ReaderT (QueryFn m) m) where +instance MonadThrow m => MonadQuerySTM (ReaderT (QueryFn m) m) where unsafeQuerySTM query = do QueryFn querySTMFn <- ask lift $ querySTMFn query @@ -185,6 +187,7 @@ instance IsAwaitable r (CachedAwaitable r) where go :: m (Either SomeException r) go = unsafeQuerySTM stepCacheTransaction >>= \case AwaitableCompleted result -> pure result + AwaitableFailed ex -> pure (Left ex) -- Cached operation is not yet completed _ -> go @@ -193,9 +196,11 @@ instance IsAwaitable r (CachedAwaitable r) where readTVar tvar >>= \case -- Cache was already completed result@(AwaitableCompleted _) -> pure $ Just result + result@(AwaitableFailed _) -> pure $ Just result AwaitableStep query fn -> do -- Run the next "querySTM" query requested by the cached operation - fn <<$>> query >>= \case + queryResult <- (fn <<$>> query) `catchAll` (pure . Just . AwaitableFailed) + case queryResult of -- In case of an incomplete query the caller (/ the monad `m`) can decide what to do (e.g. retry for -- `awaitIO`, abort for `peekAwaitable`) Nothing -> pure Nothing @@ -208,10 +213,12 @@ instance IsAwaitable r (CachedAwaitable r) where data AwaitableStepM a = AwaitableCompleted a + | AwaitableFailed SomeException | forall b. AwaitableStep (STM (Maybe b)) (b -> AwaitableStepM a) instance Functor AwaitableStepM where fmap fn (AwaitableCompleted x) = AwaitableCompleted (fn x) + fmap fn (AwaitableFailed ex) = AwaitableFailed ex fmap fn (AwaitableStep query next) = AwaitableStep query (fmap fn <$> next) instance Applicative AwaitableStepM where @@ -220,11 +227,15 @@ instance Applicative AwaitableStepM where instance Monad AwaitableStepM where (AwaitableCompleted x) >>= fn = fn x + (AwaitableFailed ex) >>= fn = AwaitableFailed ex (AwaitableStep query next) >>= fn = AwaitableStep query (next >=> fn) instance MonadQuerySTM AwaitableStepM where unsafeQuerySTM query = AwaitableStep query AwaitableCompleted +instance MonadThrow AwaitableStepM where + throwM = AwaitableFailed . toException + -- ** AsyncVar @@ -277,7 +288,9 @@ awaitEither x y = toAwaitable $ FnAwaitable $ groupLefts <$> stepBoth (runAwaita where stepBoth :: MonadQuerySTM m => AwaitableStepM ra -> AwaitableStepM rb -> m (Either ra rb) stepBoth (AwaitableCompleted resultX) _ = pure $ Left resultX + stepBoth (AwaitableFailed ex) _ = throwM ex stepBoth _ (AwaitableCompleted resultY) = pure $ Right resultY + stepBoth _ (AwaitableFailed ex) = throwM ex stepBoth stepX@(AwaitableStep transactionX nextX) stepY@(AwaitableStep transactionY nextY) = do unsafeQuerySTM (peekEitherSTM transactionX transactionY) >>= \case Left resultX -> stepBoth (nextX resultX) stepY @@ -294,12 +307,13 @@ awaitAny xs = toAwaitable $ FnAwaitable $ stepAll Empty Empty $ runAwaitable <$> -> Seq (AwaitableStepM r) -> m r stepAll _ _ (AwaitableCompleted result :<| _) = pure result + stepAll _ _ (AwaitableFailed ex :<| _) = throwM ex stepAll acc prevSteps (step@(AwaitableStep transaction next) :<| steps) = stepAll do acc |> ((\result -> (prevSteps |> next result) <> steps) <<$>> transaction) do prevSteps |> step steps - stepAll acc ps Empty = do + stepAll acc _ Empty = do newAwaitableSteps <- unsafeQuerySTM $ maybe impossibleCodePathM peekAnySTM $ nonEmpty (toList acc) stepAll Empty Empty newAwaitableSteps