From df4a6f5612f0f65650b214b23a4db7b47ef71035 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Tue, 27 Jul 2021 21:33:10 +0200 Subject: [PATCH] Further improve stepAsyncIO control flow No longer spawns unnecessary tasks when the left side of a bind or catch has to await results. --- src/Quasar/Core.hs | 78 ++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index 15e3554..8bea63f 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -76,24 +76,41 @@ runAsyncIO = withDefaultPool runOnPool :: Pool -> AsyncIO r -> AsyncIO r runOnPool pool work = await =<< (liftIO . atomically $ queueWork pool work) -data AsyncCall r = forall a. AsyncCall (Awaitable a) (Either SomeException a -> AsyncIO r) -asyncCall :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> AsyncCall r -asyncCall = AsyncCall +-- | An AsyncIO operation that can be continued after an Awaitable is completed +data BlockedAsyncIO r = forall a. BlockedAsyncIO (Awaitable a) (Either SomeException a -> AsyncIO r) + +blockedAsyncIO :: Awaitable a -> (a -> AsyncIO r) -> BlockedAsyncIO r +blockedAsyncIO input fn = BlockedAsyncIO input (either throwM fn) + +blockedAsyncIOCatch :: forall e a r. Exception e => Awaitable r -> (e -> AsyncIO r) -> BlockedAsyncIO r +blockedAsyncIOCatch input handler = BlockedAsyncIO input (applyCatch handler) + where + applyCatch :: (MonadThrow m) => (e -> m r) -> Either SomeException r -> m r + applyCatch _ (Right x) = pure x + applyCatch handler (Left ex) = maybe (throwM ex) handler (fromException ex) + + +bindBlockedAsyncIO :: BlockedAsyncIO a -> (a -> AsyncIO r) -> BlockedAsyncIO r +bindBlockedAsyncIO (BlockedAsyncIO input fx) fy = BlockedAsyncIO input (fx >=> fy) + +catchBlockedAsyncIO :: Exception e => BlockedAsyncIO r -> (e -> AsyncIO r) -> BlockedAsyncIO r +catchBlockedAsyncIO (BlockedAsyncIO input fx) handler = BlockedAsyncIO input (\x -> fx x `catch` handler) + +peekBlockedAsyncIO :: BlockedAsyncIO r -> STM (Maybe (AsyncIO r)) +peekBlockedAsyncIO (BlockedAsyncIO input fn) = fn <<$>> peekSTM input + + +-- The result of an AsyncIO operation that is executed until an incomplete Awaitable is encountered data StepResult r = StepResultCompleted r | StepResultAwaitable (Awaitable r) - | StepResultAsyncCall (AsyncCall r) + | StepResultBlocked (BlockedAsyncIO r) +-- Run an AsyncIO operation until an incomplete Awaitable is encountered stepAsyncIO :: Pool -> AsyncIO r -> IO (StepResult r) stepAsyncIO pool = go where - --packResult :: Either SomeException (StepResult r) -> Either (AsyncCall r) (Awaitable r) - --packResult (Left ex) = Right (failedAwaitable ex) - --packResult (Right (StepResultCompleted x)) = Right (successfulAwaitable x) - --packResult (Right (StepResultAwaitable x)) = Right x - --packResult (Right (StepResultAsyncCall x)) = Left x - go :: AsyncIO r -> IO (StepResult r) go (AsyncIOCompleted x) = StepResultCompleted <$> either throwIO pure x go (AsyncIOIO x) = StepResultCompleted <$> x @@ -105,29 +122,16 @@ stepAsyncIO pool = go go (AsyncIOBind x fn) = do go x >>= \case StepResultCompleted r -> go (fn r) - StepResultAwaitable awaitable -> pure $ StepResultAsyncCall (asyncCall awaitable foobar) - (StepResultAsyncCall call) -> continueAfterCall call foobar - where - foobar = either throwM fn + StepResultAwaitable awaitable -> pure $ StepResultBlocked (blockedAsyncIO awaitable fn) + StepResultBlocked call -> pure $ StepResultBlocked (bindBlockedAsyncIO call fn) go (AsyncIOCatch x handler) = do try (go x) >>= \case Left ex -> go (handler ex) Right (StepResultCompleted r) -> pure $ StepResultCompleted r - Right (StepResultAwaitable awaitable) -> pure $ StepResultAsyncCall (asyncCall awaitable foobar) - Right (StepResultAsyncCall c) -> continueAfterCall c foobar - where - foobar = either (handleSomeException handler) pure + Right (StepResultAwaitable awaitable) -> pure $ StepResultBlocked (blockedAsyncIOCatch awaitable handler) + Right (StepResultBlocked call) -> pure $ StepResultBlocked (catchBlockedAsyncIO call handler) go AsyncIOAskPool = pure $ StepResultCompleted pool - continueAfterCall :: AsyncCall a -> (Either SomeException a -> AsyncIO r) -> IO (StepResult r) - continueAfterCall call fn = do - -- Tail call optimization is not possible when having to wait for the result of a call, so the call is queued as a new work item. - awaitable <- atomically $ queueBlockedWork pool call - pure $ StepResultAsyncCall $ asyncCall awaitable fn - - handleSomeException :: forall e a m. (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a - handleSomeException handler ex = maybe (throwM ex) handler (fromException ex) - awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult = (await =<<) @@ -147,7 +151,7 @@ newtype AsyncTask r = AsyncTask (Awaitable r) instance IsAwaitable r (AsyncTask r) where toAwaitable (AsyncTask awaitable) = awaitable -data AsyncWorkItem = forall r. AsyncWorkItem (AsyncCall r) (Awaitable r -> IO ()) +data AsyncWorkItem = forall r. AsyncWorkItem (BlockedAsyncIO r) (Awaitable r -> IO ()) newtype AsyncWorkResult r = AsyncWorkResult (TMVar (Awaitable r)) instance IsAwaitable r (AsyncWorkResult r) where @@ -211,26 +215,26 @@ queueWorkItem pool item = do modifyTVar' (queue pool) (|> item) queueWork :: Pool -> AsyncIO r -> STM (Awaitable r) -queueWork pool work = queueBlockedWork pool $ asyncCall (successfulAwaitable ()) (const work) +queueWork pool work = queueBlockedWork pool $ blockedAsyncIO (successfulAwaitable ()) (const work) -queueBlockedWork :: Pool -> AsyncCall r -> STM (Awaitable r) -queueBlockedWork pool call = do +queueBlockedWork :: Pool -> BlockedAsyncIO r -> STM (Awaitable r) +queueBlockedWork pool blocked = do resultVar <- AsyncWorkResult <$> newEmptyTMVar - queueWorkItem pool $ AsyncWorkItem call (completeWork resultVar) + queueWorkItem pool $ AsyncWorkItem blocked (completeWork resultVar) pure $ toAwaitable resultVar toWorker :: Pool -> AsyncWorkItem -> STM (Maybe (IO ())) -toWorker pool (AsyncWorkItem (AsyncCall inputAwaitable work) putResult) = worker <<$>> peekSTM inputAwaitable +toWorker pool (AsyncWorkItem blocked putResult) = worker <<$>> peekBlockedAsyncIO blocked where - worker input = do + worker work = do threadId <- myThreadId atomically $ modifyTVar (threads pool) $ insert threadId - stepAsyncIO pool (work input) >>= \case + stepAsyncIO pool work >>= \case StepResultCompleted r -> putResult (successfulAwaitable r) - StepResultAwaitable x -> putResult x + StepResultAwaitable awaitable -> putResult awaitable -- This is an async tail call. Tail call optimization is performed by reusing `putResult`. - StepResultAsyncCall call -> atomically $ queueWorkItem pool (AsyncWorkItem call putResult) + StepResultBlocked blockedTail -> atomically $ queueWorkItem pool (AsyncWorkItem blockedTail putResult) atomically . modifyTVar (threads pool) . delete =<< myThreadId -- GitLab