Skip to content
Snippets Groups Projects
Commit df4a6f56 authored by Jens Nolte's avatar Jens Nolte
Browse files

Further improve stepAsyncIO control flow

No longer spawns unnecessary tasks when the left side of a bind or catch
has to await results.
parent 50b1ea01
No related branches found
No related tags found
No related merge requests found
Pipeline #2340 failed
...@@ -76,24 +76,41 @@ runAsyncIO = withDefaultPool ...@@ -76,24 +76,41 @@ runAsyncIO = withDefaultPool
runOnPool :: Pool -> AsyncIO r -> AsyncIO r runOnPool :: Pool -> AsyncIO r -> AsyncIO r
runOnPool pool work = await =<< (liftIO . atomically $ queueWork pool work) runOnPool pool work = await =<< (liftIO . atomically $ queueWork pool work)
data AsyncCall r = forall a. AsyncCall (Awaitable a) (Either SomeException a -> AsyncIO r)
asyncCall :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> AsyncCall r
asyncCall = AsyncCall
-- | An AsyncIO operation that can be continued after an Awaitable is completed
data BlockedAsyncIO r = forall a. BlockedAsyncIO (Awaitable a) (Either SomeException a -> AsyncIO r)
blockedAsyncIO :: Awaitable a -> (a -> AsyncIO r) -> BlockedAsyncIO r
blockedAsyncIO input fn = BlockedAsyncIO input (either throwM fn)
blockedAsyncIOCatch :: forall e a r. Exception e => Awaitable r -> (e -> AsyncIO r) -> BlockedAsyncIO r
blockedAsyncIOCatch input handler = BlockedAsyncIO input (applyCatch handler)
where
applyCatch :: (MonadThrow m) => (e -> m r) -> Either SomeException r -> m r
applyCatch _ (Right x) = pure x
applyCatch handler (Left ex) = maybe (throwM ex) handler (fromException ex)
bindBlockedAsyncIO :: BlockedAsyncIO a -> (a -> AsyncIO r) -> BlockedAsyncIO r
bindBlockedAsyncIO (BlockedAsyncIO input fx) fy = BlockedAsyncIO input (fx >=> fy)
catchBlockedAsyncIO :: Exception e => BlockedAsyncIO r -> (e -> AsyncIO r) -> BlockedAsyncIO r
catchBlockedAsyncIO (BlockedAsyncIO input fx) handler = BlockedAsyncIO input (\x -> fx x `catch` handler)
peekBlockedAsyncIO :: BlockedAsyncIO r -> STM (Maybe (AsyncIO r))
peekBlockedAsyncIO (BlockedAsyncIO input fn) = fn <<$>> peekSTM input
-- The result of an AsyncIO operation that is executed until an incomplete Awaitable is encountered
data StepResult r data StepResult r
= StepResultCompleted r = StepResultCompleted r
| StepResultAwaitable (Awaitable r) | StepResultAwaitable (Awaitable r)
| StepResultAsyncCall (AsyncCall r) | StepResultBlocked (BlockedAsyncIO r)
-- Run an AsyncIO operation until an incomplete Awaitable is encountered
stepAsyncIO :: Pool -> AsyncIO r -> IO (StepResult r) stepAsyncIO :: Pool -> AsyncIO r -> IO (StepResult r)
stepAsyncIO pool = go stepAsyncIO pool = go
where where
--packResult :: Either SomeException (StepResult r) -> Either (AsyncCall r) (Awaitable r)
--packResult (Left ex) = Right (failedAwaitable ex)
--packResult (Right (StepResultCompleted x)) = Right (successfulAwaitable x)
--packResult (Right (StepResultAwaitable x)) = Right x
--packResult (Right (StepResultAsyncCall x)) = Left x
go :: AsyncIO r -> IO (StepResult r) go :: AsyncIO r -> IO (StepResult r)
go (AsyncIOCompleted x) = StepResultCompleted <$> either throwIO pure x go (AsyncIOCompleted x) = StepResultCompleted <$> either throwIO pure x
go (AsyncIOIO x) = StepResultCompleted <$> x go (AsyncIOIO x) = StepResultCompleted <$> x
...@@ -105,29 +122,16 @@ stepAsyncIO pool = go ...@@ -105,29 +122,16 @@ stepAsyncIO pool = go
go (AsyncIOBind x fn) = do go (AsyncIOBind x fn) = do
go x >>= \case go x >>= \case
StepResultCompleted r -> go (fn r) StepResultCompleted r -> go (fn r)
StepResultAwaitable awaitable -> pure $ StepResultAsyncCall (asyncCall awaitable foobar) StepResultAwaitable awaitable -> pure $ StepResultBlocked (blockedAsyncIO awaitable fn)
(StepResultAsyncCall call) -> continueAfterCall call foobar StepResultBlocked call -> pure $ StepResultBlocked (bindBlockedAsyncIO call fn)
where
foobar = either throwM fn
go (AsyncIOCatch x handler) = do go (AsyncIOCatch x handler) = do
try (go x) >>= \case try (go x) >>= \case
Left ex -> go (handler ex) Left ex -> go (handler ex)
Right (StepResultCompleted r) -> pure $ StepResultCompleted r Right (StepResultCompleted r) -> pure $ StepResultCompleted r
Right (StepResultAwaitable awaitable) -> pure $ StepResultAsyncCall (asyncCall awaitable foobar) Right (StepResultAwaitable awaitable) -> pure $ StepResultBlocked (blockedAsyncIOCatch awaitable handler)
Right (StepResultAsyncCall c) -> continueAfterCall c foobar Right (StepResultBlocked call) -> pure $ StepResultBlocked (catchBlockedAsyncIO call handler)
where
foobar = either (handleSomeException handler) pure
go AsyncIOAskPool = pure $ StepResultCompleted pool go AsyncIOAskPool = pure $ StepResultCompleted pool
continueAfterCall :: AsyncCall a -> (Either SomeException a -> AsyncIO r) -> IO (StepResult r)
continueAfterCall call fn = do
-- Tail call optimization is not possible when having to wait for the result of a call, so the call is queued as a new work item.
awaitable <- atomically $ queueBlockedWork pool call
pure $ StepResultAsyncCall $ asyncCall awaitable fn
handleSomeException :: forall e a m. (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a
handleSomeException handler ex = maybe (throwM ex) handler (fromException ex)
awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r
awaitResult = (await =<<) awaitResult = (await =<<)
...@@ -147,7 +151,7 @@ newtype AsyncTask r = AsyncTask (Awaitable r) ...@@ -147,7 +151,7 @@ newtype AsyncTask r = AsyncTask (Awaitable r)
instance IsAwaitable r (AsyncTask r) where instance IsAwaitable r (AsyncTask r) where
toAwaitable (AsyncTask awaitable) = awaitable toAwaitable (AsyncTask awaitable) = awaitable
data AsyncWorkItem = forall r. AsyncWorkItem (AsyncCall r) (Awaitable r -> IO ()) data AsyncWorkItem = forall r. AsyncWorkItem (BlockedAsyncIO r) (Awaitable r -> IO ())
newtype AsyncWorkResult r = AsyncWorkResult (TMVar (Awaitable r)) newtype AsyncWorkResult r = AsyncWorkResult (TMVar (Awaitable r))
instance IsAwaitable r (AsyncWorkResult r) where instance IsAwaitable r (AsyncWorkResult r) where
...@@ -211,26 +215,26 @@ queueWorkItem pool item = do ...@@ -211,26 +215,26 @@ queueWorkItem pool item = do
modifyTVar' (queue pool) (|> item) modifyTVar' (queue pool) (|> item)
queueWork :: Pool -> AsyncIO r -> STM (Awaitable r) queueWork :: Pool -> AsyncIO r -> STM (Awaitable r)
queueWork pool work = queueBlockedWork pool $ asyncCall (successfulAwaitable ()) (const work) queueWork pool work = queueBlockedWork pool $ blockedAsyncIO (successfulAwaitable ()) (const work)
queueBlockedWork :: Pool -> AsyncCall r -> STM (Awaitable r) queueBlockedWork :: Pool -> BlockedAsyncIO r -> STM (Awaitable r)
queueBlockedWork pool call = do queueBlockedWork pool blocked = do
resultVar <- AsyncWorkResult <$> newEmptyTMVar resultVar <- AsyncWorkResult <$> newEmptyTMVar
queueWorkItem pool $ AsyncWorkItem call (completeWork resultVar) queueWorkItem pool $ AsyncWorkItem blocked (completeWork resultVar)
pure $ toAwaitable resultVar pure $ toAwaitable resultVar
toWorker :: Pool -> AsyncWorkItem -> STM (Maybe (IO ())) toWorker :: Pool -> AsyncWorkItem -> STM (Maybe (IO ()))
toWorker pool (AsyncWorkItem (AsyncCall inputAwaitable work) putResult) = worker <<$>> peekSTM inputAwaitable toWorker pool (AsyncWorkItem blocked putResult) = worker <<$>> peekBlockedAsyncIO blocked
where where
worker input = do worker work = do
threadId <- myThreadId threadId <- myThreadId
atomically $ modifyTVar (threads pool) $ insert threadId atomically $ modifyTVar (threads pool) $ insert threadId
stepAsyncIO pool (work input) >>= \case stepAsyncIO pool work >>= \case
StepResultCompleted r -> putResult (successfulAwaitable r) StepResultCompleted r -> putResult (successfulAwaitable r)
StepResultAwaitable x -> putResult x StepResultAwaitable awaitable -> putResult awaitable
-- This is an async tail call. Tail call optimization is performed by reusing `putResult`. -- This is an async tail call. Tail call optimization is performed by reusing `putResult`.
StepResultAsyncCall call -> atomically $ queueWorkItem pool (AsyncWorkItem call putResult) StepResultBlocked blockedTail -> atomically $ queueWorkItem pool (AsyncWorkItem blockedTail putResult)
atomically . modifyTVar (threads pool) . delete =<< myThreadId atomically . modifyTVar (threads pool) . delete =<< myThreadId
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment