From 50b1ea018b0ead14f53767310f15eba9999b2a7a Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Mon, 26 Jul 2021 22:08:00 +0200 Subject: [PATCH] Update stepAsyncIO for tail call optimization and actually start tasks --- quasar.cabal | 1 + src/Quasar/Core.hs | 197 ++++++++++++++---- .../Observable/ObservableHashMapSpec.hs | 1 - .../Observable/ObservablePrioritySpec.hs | 1 - 4 files changed, 153 insertions(+), 47 deletions(-) diff --git a/quasar.cabal b/quasar.cabal index bbcd9f4..6a2e1ca 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -65,6 +65,7 @@ library build-depends: base >=4.7 && <5, binary, + containers, exceptions, ghc-prim, hashable, diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index 82d68a1..15e3554 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -7,12 +7,12 @@ module Quasar.Core ( awaitResult, ) where -import Control.Concurrent (forkIOWithUnmask) +import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, myThreadId) import Control.Concurrent.STM import Control.Exception (MaskingState(..), getMaskingState) import Control.Monad.Catch -import Data.Maybe (isJust) -import Data.Void (absurd) +import Data.HashSet +import Data.Sequence import Quasar.Awaitable import Quasar.Prelude @@ -71,39 +71,62 @@ await = AsyncIOAwait . toAwaitable -- | Run an `AsyncIO` to completion and return the result. runAsyncIO :: AsyncIO r -> IO r -runAsyncIO x = withDefaultPool $ \pool -> runAsyncIOWithPool pool x +runAsyncIO = withDefaultPool -runAsyncIOWithPool :: Pool -> AsyncIO r -> IO r -runAsyncIOWithPool pool x = do - stepResult <- stepAsyncIO pool x - case stepResult of - Left awaitable -> either throwIO pure =<< atomically (awaitSTM awaitable) - Right result -> pure result +runOnPool :: Pool -> AsyncIO r -> AsyncIO r +runOnPool pool work = await =<< (liftIO . atomically $ queueWork pool work) +data AsyncCall r = forall a. AsyncCall (Awaitable a) (Either SomeException a -> AsyncIO r) +asyncCall :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> AsyncCall r +asyncCall = AsyncCall -stepAsyncIO :: Pool -> AsyncIO r -> IO (Either (Awaitable r) r) +data StepResult r + = StepResultCompleted r + | StepResultAwaitable (Awaitable r) + | StepResultAsyncCall (AsyncCall r) + +stepAsyncIO :: Pool -> AsyncIO r -> IO (StepResult r) stepAsyncIO pool = go where - go :: AsyncIO r -> IO (Either (Awaitable r) r) - go (AsyncIOCompleted x) = Right <$> either throwIO pure x - go (AsyncIOIO x) = Right <$> x - go (AsyncIOAwait x) = pure $ Left x + --packResult :: Either SomeException (StepResult r) -> Either (AsyncCall r) (Awaitable r) + --packResult (Left ex) = Right (failedAwaitable ex) + --packResult (Right (StepResultCompleted x)) = Right (successfulAwaitable x) + --packResult (Right (StepResultAwaitable x)) = Right x + --packResult (Right (StepResultAsyncCall x)) = Left x + + go :: AsyncIO r -> IO (StepResult r) + go (AsyncIOCompleted x) = StepResultCompleted <$> either throwIO pure x + go (AsyncIOIO x) = StepResultCompleted <$> x + go (AsyncIOAwait x) = + atomically (peekSTM x) >>= \case + Nothing -> pure $ StepResultAwaitable x + Just (Left ex) -> throwIO ex + Just (Right r) -> pure $ StepResultCompleted r go (AsyncIOBind x fn) = do go x >>= \case - Left awaitable -> bindAwaitable awaitable (either throwM fn) - Right r -> go (fn r) + StepResultCompleted r -> go (fn r) + StepResultAwaitable awaitable -> pure $ StepResultAsyncCall (asyncCall awaitable foobar) + (StepResultAsyncCall call) -> continueAfterCall call foobar + where + foobar = either throwM fn go (AsyncIOCatch x handler) = do try (go x) >>= \case Left ex -> go (handler ex) - Right (Left awaitable) -> bindAwaitable awaitable (either (handleSomeException handler) pure) - Right (Right r) -> pure $ Right r - go AsyncIOAskPool = pure $ Right pool - - bindAwaitable :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> IO (Either (Awaitable r) r) - bindAwaitable input work = fmap Left . atomically $ queueBlockedWork pool input work - -handleSomeException :: (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a -handleSomeException handler ex = maybe (throwM ex) handler (fromException ex) + Right (StepResultCompleted r) -> pure $ StepResultCompleted r + Right (StepResultAwaitable awaitable) -> pure $ StepResultAsyncCall (asyncCall awaitable foobar) + Right (StepResultAsyncCall c) -> continueAfterCall c foobar + where + foobar = either (handleSomeException handler) pure + go AsyncIOAskPool = pure $ StepResultCompleted pool + + continueAfterCall :: AsyncCall a -> (Either SomeException a -> AsyncIO r) -> IO (StepResult r) + continueAfterCall call fn = do + -- Tail call optimization is not possible when having to wait for the result of a call, so the call is queued as a new work item. + awaitable <- atomically $ queueBlockedWork pool call + pure $ StepResultAsyncCall $ asyncCall awaitable fn + + handleSomeException :: forall e a m. (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a + handleSomeException handler ex = maybe (throwM ex) handler (fromException ex) awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult = (await =<<) @@ -115,35 +138,119 @@ awaitResult = (await =<<) -- AsyncIORuntime -- AsyncIOContext data Pool = Pool { - queue :: TVar [AsyncQueueItem] + configuration :: PoolConfiguraiton, + queue :: TVar (Seq AsyncWorkItem), + threads :: TVar (HashSet ThreadId) } -data AsyncQueueItem = forall a. AsyncQueueItem (Awaitable a) (Either SomeException a -> AsyncIO ()) +newtype AsyncTask r = AsyncTask (Awaitable r) +instance IsAwaitable r (AsyncTask r) where + toAwaitable (AsyncTask awaitable) = awaitable + +data AsyncWorkItem = forall r. AsyncWorkItem (AsyncCall r) (Awaitable r -> IO ()) + +newtype AsyncWorkResult r = AsyncWorkResult (TMVar (Awaitable r)) +instance IsAwaitable r (AsyncWorkResult r) where + peekSTM (AsyncWorkResult var) = peekSTM =<< readTMVar var + +completeWork :: AsyncWorkResult r -> Awaitable r -> IO () +completeWork (AsyncWorkResult var) = atomically . putTMVar var + -withPool :: (Pool -> IO a) -> IO a -withPool = undefined +data PoolConfiguraiton = PoolConfiguraiton -withDefaultPool :: (Pool -> IO a) -> IO a -withDefaultPool = (=<< atomically defaultPool) +defaultPoolConfiguration :: PoolConfiguraiton +defaultPoolConfiguration = PoolConfiguraiton -defaultPool :: STM Pool -defaultPool = do - queue <- newTVar [] - pure Pool { - queue +withPool :: PoolConfiguraiton -> AsyncIO r -> IO r +withPool configuration work = mask $ \unmask -> do + pool <- newPool configuration + task <- atomically $ newTask pool work + + result <- awaitAndThrowTo task unmask `finally` pure () -- TODO dispose pool + + either throwIO pure result + + where + awaitAndThrowTo :: AsyncTask r -> (forall a. IO a -> IO a) -> IO (Either SomeException r) + -- TODO handle asynchronous exceptions (stop pool) + awaitAndThrowTo task unmask = unmask (atomically (awaitSTM task)) `catchAll` (\ex -> undefined >> awaitAndThrowTo task unmask) + +withDefaultPool :: AsyncIO a -> IO a +withDefaultPool = withPool defaultPoolConfiguration + +newPool :: PoolConfiguraiton -> IO Pool +newPool configuration = do + queue <- newTVarIO mempty + threads <- newTVarIO mempty + let pool = Pool { + configuration, + queue, + threads } + void $ forkIO (managePool pool) + pure pool + where + managePool :: Pool -> IO () + managePool pool = forever $ do + worker <- atomically $ takeWorkItem (toWorker pool) pool + void $ forkIO worker + + workerSetup :: Pool -> IO () -> (forall a. IO a -> IO a) -> IO () + workerSetup pool worker unmask = do + unmask worker + +newTask :: Pool -> AsyncIO r -> STM (AsyncTask r) +newTask pool work = do + awaitable <- queueWork pool work + pure $ AsyncTask awaitable + + +queueWorkItem :: Pool -> AsyncWorkItem -> STM () +queueWorkItem pool item = do + modifyTVar' (queue pool) (|> item) queueWork :: Pool -> AsyncIO r -> STM (Awaitable r) -queueWork pool work = queueBlockedWork pool (successfulAwaitable ()) (const work) - -queueBlockedWork :: Pool -> Awaitable a -> (Either SomeException a -> AsyncIO r) -> STM (Awaitable r) -queueBlockedWork pool input work = do - resultVar <- newAsyncVarSTM - -- TODO masking state - let actualWork = try . work >=> putAsyncVarEither_ resultVar - undefined +queueWork pool work = queueBlockedWork pool $ asyncCall (successfulAwaitable ()) (const work) + +queueBlockedWork :: Pool -> AsyncCall r -> STM (Awaitable r) +queueBlockedWork pool call = do + resultVar <- AsyncWorkResult <$> newEmptyTMVar + queueWorkItem pool $ AsyncWorkItem call (completeWork resultVar) pure $ toAwaitable resultVar +toWorker :: Pool -> AsyncWorkItem -> STM (Maybe (IO ())) +toWorker pool (AsyncWorkItem (AsyncCall inputAwaitable work) putResult) = worker <<$>> peekSTM inputAwaitable + where + worker input = do + threadId <- myThreadId + atomically $ modifyTVar (threads pool) $ insert threadId + + stepAsyncIO pool (work input) >>= \case + StepResultCompleted r -> putResult (successfulAwaitable r) + StepResultAwaitable x -> putResult x + -- This is an async tail call. Tail call optimization is performed by reusing `putResult`. + StepResultAsyncCall call -> atomically $ queueWorkItem pool (AsyncWorkItem call putResult) + + atomically . modifyTVar (threads pool) . delete =<< myThreadId + + +takeWorkItem :: forall a. (AsyncWorkItem -> STM (Maybe a)) -> Pool -> STM a +takeWorkItem fn pool = do + items <- readTVar (queue pool) + (item, remaining) <- nextWorkItem Empty items + writeTVar (queue pool) remaining + pure item + where + nextWorkItem :: Seq AsyncWorkItem -> Seq AsyncWorkItem -> STM (a, Seq AsyncWorkItem) + nextWorkItem remaining (item :<| seq) = do + fn item >>= \case + Just work -> pure (work, remaining <> seq) + Nothing -> nextWorkItem (remaining |> item) seq + nextWorkItem _ _ = retry + + + -- * Awaiting multiple asyncs diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index f3cc07e..ae526ef 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -1,6 +1,5 @@ module Quasar.Observable.ObservableHashMapSpec (spec) where -import Quasar.Core import Quasar.Disposable import Quasar.Observable import Quasar.Observable.Delta diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index a09b9e3..f05d11a 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -1,6 +1,5 @@ module Quasar.Observable.ObservablePrioritySpec (spec) where -import Quasar.Core import Quasar.Disposable import Quasar.Observable import Quasar.Observable.ObservablePriority (ObservablePriority) -- GitLab