diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index cb143e47e7c9f4011493e68a7de4743fd73837e4..fdfd3189e9fcdc074cea87e6c34df94c1bbf34b2 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -86,7 +86,7 @@ asyncWithUnmask action = do Just threadId -> throwTo threadId CancelTask -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang. - pure $ mapAwaitable (const $ pure ()) resultVar + pure $ void (toAwaitable resultVar) `catchAll` const (pure ()) liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a liftUnmask unmask action = do diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs index 34054f0eb44aa8156dd2c897bde3d9955bdd94af..dd98b0a57f6e037c36bbff3492122f345da967fb 100644 --- a/src/Quasar/Awaitable.hs +++ b/src/Quasar/Awaitable.hs @@ -9,7 +9,6 @@ module Quasar.Awaitable ( failedAwaitable, completedAwaitable, simpleAwaitable, - mapAwaitable, -- * Awaiting multiple awaitables awaitEither, @@ -46,7 +45,7 @@ import Quasar.Prelude class IsAwaitable r a | a -> r where - runAwaitable :: (MonadQuerySTM m) => a -> m (Either SomeException r) + runAwaitable :: (MonadQuerySTM m) => a -> m r runAwaitable self = runAwaitable (toAwaitable self) cacheAwaitable :: MonadIO m => a -> m (Awaitable r) @@ -59,15 +58,16 @@ class IsAwaitable r a | a -> r where awaitIO :: (IsAwaitable r a, MonadIO m) => a -> m r -awaitIO awaitable = liftIO $ either throwIO pure =<< - runQueryT (atomically . (maybe retry pure =<<)) (runAwaitable awaitable) +awaitIO awaitable = liftIO $ runQueryT atomically (runAwaitable awaitable) tryAwaitIO :: (IsAwaitable r a, MonadIO m) => a -> m (Either SomeException r) -tryAwaitIO awaitable = liftIO $ fmap join $ try $ - runQueryT (atomically . (maybe retry pure =<<)) (runAwaitable awaitable) +tryAwaitIO awaitable = liftIO $ try $ awaitIO awaitable peekAwaitable :: (IsAwaitable r a, MonadIO m) => a -> m (Maybe (Either SomeException r)) -peekAwaitable awaitable = liftIO $ runMaybeT $ runQueryT (MaybeT . atomically) (runAwaitable awaitable) +peekAwaitable awaitable = liftIO $ runMaybeT $ try $ runQueryT queryFn (runAwaitable awaitable) + where + queryFn :: STM a -> MaybeT IO a + queryFn transaction = MaybeT $ atomically $ (Just <$> transaction) `orElse` pure Nothing data Awaitable r = forall a. IsAwaitable r a => Awaitable a @@ -78,17 +78,14 @@ instance IsAwaitable r (Awaitable r) where toAwaitable = id instance Functor Awaitable where - fmap fn (Awaitable x) = fnAwaitable $ fn <<$>> runAwaitable x + fmap fn (Awaitable x) = fnAwaitable $ fn <$> runAwaitable x instance Applicative Awaitable where - pure value = fnAwaitable $ pure (Right value) - liftA2 fn (Awaitable fx) (Awaitable fy) = fnAwaitable $ liftA2 (liftA2 fn) (runAwaitable fx) (runAwaitable fy) + pure value = fnAwaitable $ pure value + liftA2 fn (Awaitable fx) (Awaitable fy) = fnAwaitable $ liftA2 fn (runAwaitable fx) (runAwaitable fy) instance Monad Awaitable where - (Awaitable fx) >>= fn = fnAwaitable $ do - runAwaitable fx >>= \case - Left ex -> pure $ Left ex - Right x -> runAwaitable (fn x) + (Awaitable fx) >>= fn = fnAwaitable $ runAwaitable fx >>= runAwaitable . fn instance Semigroup r => Semigroup (Awaitable r) where x <> y = liftA2 (<>) x y @@ -101,9 +98,7 @@ instance MonadThrow Awaitable where instance MonadCatch Awaitable where catch awaitable handler = fnAwaitable do - runAwaitable awaitable >>= \case - l@(Left ex) -> maybe (pure l) (runAwaitable . handler) $ fromException ex - Right value -> pure $ Right value + runAwaitable awaitable `catch` \ex -> runAwaitable (handler ex) instance MonadFail Awaitable where fail = throwM . userError @@ -117,20 +112,20 @@ instance MonadPlus Awaitable -newtype FnAwaitable r = FnAwaitable (forall m. (MonadQuerySTM m) => m (Either SomeException r)) +newtype FnAwaitable r = FnAwaitable (forall m. (MonadQuerySTM m) => m r) instance IsAwaitable r (FnAwaitable r) where runAwaitable (FnAwaitable x) = x cacheAwaitable = cacheAwaitableDefaultImplementation -fnAwaitable :: (forall m. (MonadQuerySTM m) => m (Either SomeException r)) -> Awaitable r +fnAwaitable :: (forall m. (MonadQuerySTM m) => m r) -> Awaitable r fnAwaitable fn = toAwaitable $ FnAwaitable fn newtype CompletedAwaitable r = CompletedAwaitable (Either SomeException r) instance IsAwaitable r (CompletedAwaitable r) where - runAwaitable (CompletedAwaitable x) = pure x + runAwaitable (CompletedAwaitable x) = either throwM pure x cacheAwaitable = pure . toAwaitable @@ -143,81 +138,72 @@ successfulAwaitable = completedAwaitable . Right failedAwaitable :: SomeException -> Awaitable r failedAwaitable = completedAwaitable . Left --- | Create an awaitable from an `STM` transaction. +-- | Create an awaitable from an `STM` transaction. The STM transaction should not have visible side effects. -- -- Use `retry` to signal that the awaitable is not yet completed and `throwM`/`throwSTM` to set the awaitable to failed. simpleAwaitable :: STM a -> Awaitable a -simpleAwaitable query = fnAwaitable $ querySTM do - (Right <$> query) - `catchAll` - \ex -> pure (Left ex) - -mapAwaitable :: IsAwaitable i a => (Either SomeException i -> Either SomeException r) -> a -> Awaitable r -mapAwaitable fn awaitable = fnAwaitable $ fn <$> runAwaitable awaitable +simpleAwaitable query = fnAwaitable $ querySTM query -class MonadThrow m => MonadQuerySTM m where - -- | Run an `STM` transaction. `retry` can be used. +class MonadCatch m => MonadQuerySTM m where + -- | Run an `STM` transaction. Use `retry` to signal that no value is available (yet). querySTM :: (forall a. STM a -> m a) - querySTM transaction = unsafeQuerySTM $ (Just <$> transaction) `orElse` pure Nothing - -- | Run an "STM` transaction. `retry` MUST NOT be used. - unsafeQuerySTM :: (forall a. STM (Maybe a) -> m a) - unsafeQuerySTM transaction = querySTM $ maybe retry pure =<< transaction - {-# MINIMAL querySTM | unsafeQuerySTM #-} +-- | Run an `STM` transaction. Use `retry` to signal that no value is available (yet). +tryQuerySTM :: MonadQuerySTM m => STM a -> m (Either SomeException a) +tryQuerySTM transaction = querySTM (try transaction) -instance MonadThrow m => MonadQuerySTM (ReaderT (QueryFn m) m) where - unsafeQuerySTM query = do + +instance MonadCatch m => MonadQuerySTM (ReaderT (QueryFn m) m) where + querySTM query = do QueryFn querySTMFn <- ask lift $ querySTMFn query -newtype QueryFn m = QueryFn (forall a. STM (Maybe a) -> m a) +newtype QueryFn m = QueryFn (forall a. STM a -> m a) -runQueryT :: forall m a. (forall b. STM (Maybe b) -> m b) -> ReaderT (QueryFn m) m a -> m a +runQueryT :: forall m a. (forall b. STM b -> m b) -> ReaderT (QueryFn m) m a -> m a runQueryT queryFn action = runReaderT action (QueryFn queryFn) -newtype CachedAwaitable r = CachedAwaitable (TVar (AwaitableStepM (Either SomeException r))) +newtype CachedAwaitable r = CachedAwaitable (TVar (AwaitableStepM r)) cacheAwaitableDefaultImplementation :: (IsAwaitable r a, MonadIO m) => a -> m (Awaitable r) cacheAwaitableDefaultImplementation awaitable = toAwaitable . CachedAwaitable <$> liftIO (newTVarIO (runAwaitable awaitable)) instance IsAwaitable r (CachedAwaitable r) where - runAwaitable :: forall m. (MonadQuerySTM m) => CachedAwaitable r -> m (Either SomeException r) + runAwaitable :: forall m. (MonadQuerySTM m) => CachedAwaitable r -> m r runAwaitable (CachedAwaitable tvar) = go where - go :: m (Either SomeException r) - go = unsafeQuerySTM stepCacheTransaction >>= \case + go :: m r + go = querySTM stepCacheTransaction >>= \case AwaitableCompleted result -> pure result - AwaitableFailed ex -> pure (Left ex) + AwaitableFailed ex -> throwM ex -- Cached operation is not yet completed _ -> go - stepCacheTransaction :: STM (Maybe (AwaitableStepM (Either SomeException r))) + stepCacheTransaction :: STM (AwaitableStepM r) stepCacheTransaction = do readTVar tvar >>= \case - -- Cache was already completed - result@(AwaitableCompleted _) -> pure $ Just result - result@(AwaitableFailed _) -> pure $ Just result + -- Cache needs to be stepped AwaitableStep query fn -> do -- Run the next "querySTM" query requested by the cached operation - queryResult <- (fn <<$>> query) `catchAll` (pure . Just . AwaitableFailed) - case queryResult of - -- In case of an incomplete query the caller (/ the monad `m`) can decide what to do (e.g. retry for - -- `awaitIO`, abort for `peekAwaitable`) - Nothing -> pure Nothing - -- Query was successful. Update cache and exit query - Just nextStep -> do - writeTVar tvar nextStep - pure $ Just nextStep + -- The query might `retry`, which is ok here + nextStep <- fn <$> try query + -- In case of an incomplete query the caller (/ the monad `m`) can decide what to do (e.g. retry for + -- `awaitIO`, abort for `peekAwaitable`) + -- Query was successful. Update cache and exit query + writeTVar tvar nextStep + pure nextStep + -- Cache was already completed + result -> pure result cacheAwaitable = pure . toAwaitable data AwaitableStepM a = AwaitableCompleted a | AwaitableFailed SomeException - | forall b. AwaitableStep (STM (Maybe b)) (b -> AwaitableStepM a) + | forall b. AwaitableStep (STM b) (Either SomeException b -> AwaitableStepM a) instance Functor AwaitableStepM where fmap fn (AwaitableCompleted x) = AwaitableCompleted (fn x) @@ -234,11 +220,16 @@ instance Monad AwaitableStepM where (AwaitableStep query next) >>= fn = AwaitableStep query (next >=> fn) instance MonadQuerySTM AwaitableStepM where - unsafeQuerySTM query = AwaitableStep query AwaitableCompleted + querySTM query = AwaitableStep query (either AwaitableFailed AwaitableCompleted) instance MonadThrow AwaitableStepM where throwM = AwaitableFailed . toException +instance MonadCatch AwaitableStepM where + catch result@(AwaitableCompleted _) _ = result + catch result@(AwaitableFailed ex) handler = maybe result handler $ fromException ex + catch (AwaitableStep query next) handler = AwaitableStep query (\x -> next x `catch` handler) + -- ** AsyncVar @@ -246,7 +237,8 @@ instance MonadThrow AwaitableStepM where newtype AsyncVar r = AsyncVar (TMVar (Either SomeException r)) instance IsAwaitable r (AsyncVar r) where - runAwaitable (AsyncVar var) = unsafeQuerySTM $ tryReadTMVar var + runAwaitable (AsyncVar var) = querySTM $ either throwM pure =<< readTMVar var + -- An AsyncVar is a primitive awaitable, so caching is not necessary cacheAwaitable = pure . toAwaitable @@ -287,7 +279,7 @@ putAsyncVarEitherSTM_ var = void . putAsyncVarEitherSTM var -- * Awaiting multiple asyncs awaitEither :: (IsAwaitable ra a, IsAwaitable rb b) => a -> b -> Awaitable (Either ra rb) -awaitEither x y = fnAwaitable $ groupLefts <$> stepBoth (runAwaitable x) (runAwaitable y) +awaitEither x y = fnAwaitable $ stepBoth (runAwaitable x) (runAwaitable y) where stepBoth :: MonadQuerySTM m => AwaitableStepM ra -> AwaitableStepM rb -> m (Either ra rb) stepBoth (AwaitableCompleted resultX) _ = pure $ Left resultX @@ -295,7 +287,7 @@ awaitEither x y = fnAwaitable $ groupLefts <$> stepBoth (runAwaitable x) (runAwa stepBoth _ (AwaitableCompleted resultY) = pure $ Right resultY stepBoth _ (AwaitableFailed ex) = throwM ex stepBoth stepX@(AwaitableStep transactionX nextX) stepY@(AwaitableStep transactionY nextY) = do - unsafeQuerySTM (peekEitherSTM transactionX transactionY) >>= \case + querySTM (eitherSTM (try transactionX) (try transactionY)) >>= \case Left resultX -> stepBoth (nextX resultX) stepY Right resultY -> stepBoth stepX (nextY resultY) @@ -305,7 +297,7 @@ awaitAny xs = fnAwaitable $ stepAll Empty Empty $ runAwaitable <$> fromList (toL where stepAll :: MonadQuerySTM m - => Seq (STM (Maybe (Seq (AwaitableStepM r)))) + => Seq (STM (Seq (AwaitableStepM r))) -> Seq (AwaitableStepM r) -> Seq (AwaitableStepM r) -> m r @@ -313,11 +305,11 @@ awaitAny xs = fnAwaitable $ stepAll Empty Empty $ runAwaitable <$> fromList (toL stepAll _ _ (AwaitableFailed ex :<| _) = throwM ex stepAll acc prevSteps (step@(AwaitableStep transaction next) :<| steps) = stepAll - do acc |> ((\result -> (prevSteps |> next result) <> steps) <<$>> transaction) + do acc |> ((\result -> (prevSteps |> next result) <> steps) <$> try transaction) do prevSteps |> step steps stepAll acc _ Empty = do - newAwaitableSteps <- unsafeQuerySTM $ maybe impossibleCodePathM peekAnySTM $ nonEmpty (toList acc) + newAwaitableSteps <- querySTM $ maybe impossibleCodePathM anySTM $ nonEmpty (toList acc) stepAll Empty Empty newAwaitableSteps @@ -325,19 +317,8 @@ awaitAny2 :: IsAwaitable r a => a -> a -> Awaitable r awaitAny2 x y = awaitAny (x :| [y]) -groupLefts :: Either (Either ex a) (Either ex b) -> Either ex (Either a b) -groupLefts (Left x) = Left <$> x -groupLefts (Right y) = Right <$> y - -peekEitherSTM :: STM (Maybe a) -> STM (Maybe b) -> STM (Maybe (Either a b)) -peekEitherSTM x y = - x >>= \case - Just r -> pure (Just (Left r)) - Nothing -> y >>= \case - Just r -> pure (Just (Right r)) - Nothing -> pure Nothing +eitherSTM :: STM a -> STM b -> STM (Either a b) +eitherSTM x y = fmap Left x `orElse` fmap Right y -peekAnySTM :: NonEmpty (STM (Maybe a)) -> STM (Maybe a) -peekAnySTM (x :| xs) = x >>= \case - r@(Just _) -> pure r - Nothing -> maybe (pure Nothing) peekAnySTM (nonEmpty xs) +anySTM :: NonEmpty (STM a) -> STM a +anySTM (x :| xs) = x `orElse` maybe retry anySTM (nonEmpty xs)