Skip to content
Snippets Groups Projects
Commit 50b1ea01 authored by Jens Nolte's avatar Jens Nolte
Browse files

Update stepAsyncIO for tail call optimization and actually start tasks

parent f59ef367
No related branches found
No related tags found
No related merge requests found
...@@ -65,6 +65,7 @@ library ...@@ -65,6 +65,7 @@ library
build-depends: build-depends:
base >=4.7 && <5, base >=4.7 && <5,
binary, binary,
containers,
exceptions, exceptions,
ghc-prim, ghc-prim,
hashable, hashable,
......
...@@ -7,12 +7,12 @@ module Quasar.Core ( ...@@ -7,12 +7,12 @@ module Quasar.Core (
awaitResult, awaitResult,
) where ) where
import Control.Concurrent (forkIOWithUnmask) import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask, myThreadId)
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception (MaskingState(..), getMaskingState) import Control.Exception (MaskingState(..), getMaskingState)
import Control.Monad.Catch import Control.Monad.Catch
import Data.Maybe (isJust) import Data.HashSet
import Data.Void (absurd) import Data.Sequence
import Quasar.Awaitable import Quasar.Awaitable
import Quasar.Prelude import Quasar.Prelude
...@@ -71,39 +71,62 @@ await = AsyncIOAwait . toAwaitable ...@@ -71,39 +71,62 @@ await = AsyncIOAwait . toAwaitable
-- | Run an `AsyncIO` to completion and return the result. -- | Run an `AsyncIO` to completion and return the result.
runAsyncIO :: AsyncIO r -> IO r runAsyncIO :: AsyncIO r -> IO r
runAsyncIO x = withDefaultPool $ \pool -> runAsyncIOWithPool pool x runAsyncIO = withDefaultPool
runAsyncIOWithPool :: Pool -> AsyncIO r -> IO r runOnPool :: Pool -> AsyncIO r -> AsyncIO r
runAsyncIOWithPool pool x = do runOnPool pool work = await =<< (liftIO . atomically $ queueWork pool work)
stepResult <- stepAsyncIO pool x
case stepResult of
Left awaitable -> either throwIO pure =<< atomically (awaitSTM awaitable)
Right result -> pure result
data AsyncCall r = forall a. AsyncCall (Awaitable a) (Either SomeException a -> AsyncIO r)
asyncCall :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> AsyncCall r
asyncCall = AsyncCall
stepAsyncIO :: Pool -> AsyncIO r -> IO (Either (Awaitable r) r) data StepResult r
= StepResultCompleted r
| StepResultAwaitable (Awaitable r)
| StepResultAsyncCall (AsyncCall r)
stepAsyncIO :: Pool -> AsyncIO r -> IO (StepResult r)
stepAsyncIO pool = go stepAsyncIO pool = go
where where
go :: AsyncIO r -> IO (Either (Awaitable r) r) --packResult :: Either SomeException (StepResult r) -> Either (AsyncCall r) (Awaitable r)
go (AsyncIOCompleted x) = Right <$> either throwIO pure x --packResult (Left ex) = Right (failedAwaitable ex)
go (AsyncIOIO x) = Right <$> x --packResult (Right (StepResultCompleted x)) = Right (successfulAwaitable x)
go (AsyncIOAwait x) = pure $ Left x --packResult (Right (StepResultAwaitable x)) = Right x
--packResult (Right (StepResultAsyncCall x)) = Left x
go :: AsyncIO r -> IO (StepResult r)
go (AsyncIOCompleted x) = StepResultCompleted <$> either throwIO pure x
go (AsyncIOIO x) = StepResultCompleted <$> x
go (AsyncIOAwait x) =
atomically (peekSTM x) >>= \case
Nothing -> pure $ StepResultAwaitable x
Just (Left ex) -> throwIO ex
Just (Right r) -> pure $ StepResultCompleted r
go (AsyncIOBind x fn) = do go (AsyncIOBind x fn) = do
go x >>= \case go x >>= \case
Left awaitable -> bindAwaitable awaitable (either throwM fn) StepResultCompleted r -> go (fn r)
Right r -> go (fn r) StepResultAwaitable awaitable -> pure $ StepResultAsyncCall (asyncCall awaitable foobar)
(StepResultAsyncCall call) -> continueAfterCall call foobar
where
foobar = either throwM fn
go (AsyncIOCatch x handler) = do go (AsyncIOCatch x handler) = do
try (go x) >>= \case try (go x) >>= \case
Left ex -> go (handler ex) Left ex -> go (handler ex)
Right (Left awaitable) -> bindAwaitable awaitable (either (handleSomeException handler) pure) Right (StepResultCompleted r) -> pure $ StepResultCompleted r
Right (Right r) -> pure $ Right r Right (StepResultAwaitable awaitable) -> pure $ StepResultAsyncCall (asyncCall awaitable foobar)
go AsyncIOAskPool = pure $ Right pool Right (StepResultAsyncCall c) -> continueAfterCall c foobar
where
bindAwaitable :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> IO (Either (Awaitable r) r) foobar = either (handleSomeException handler) pure
bindAwaitable input work = fmap Left . atomically $ queueBlockedWork pool input work go AsyncIOAskPool = pure $ StepResultCompleted pool
handleSomeException :: (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a continueAfterCall :: AsyncCall a -> (Either SomeException a -> AsyncIO r) -> IO (StepResult r)
handleSomeException handler ex = maybe (throwM ex) handler (fromException ex) continueAfterCall call fn = do
-- Tail call optimization is not possible when having to wait for the result of a call, so the call is queued as a new work item.
awaitable <- atomically $ queueBlockedWork pool call
pure $ StepResultAsyncCall $ asyncCall awaitable fn
handleSomeException :: forall e a m. (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a
handleSomeException handler ex = maybe (throwM ex) handler (fromException ex)
awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r
awaitResult = (await =<<) awaitResult = (await =<<)
...@@ -115,35 +138,119 @@ awaitResult = (await =<<) ...@@ -115,35 +138,119 @@ awaitResult = (await =<<)
-- AsyncIORuntime -- AsyncIORuntime
-- AsyncIOContext -- AsyncIOContext
data Pool = Pool { data Pool = Pool {
queue :: TVar [AsyncQueueItem] configuration :: PoolConfiguraiton,
queue :: TVar (Seq AsyncWorkItem),
threads :: TVar (HashSet ThreadId)
} }
data AsyncQueueItem = forall a. AsyncQueueItem (Awaitable a) (Either SomeException a -> AsyncIO ()) newtype AsyncTask r = AsyncTask (Awaitable r)
instance IsAwaitable r (AsyncTask r) where
toAwaitable (AsyncTask awaitable) = awaitable
data AsyncWorkItem = forall r. AsyncWorkItem (AsyncCall r) (Awaitable r -> IO ())
newtype AsyncWorkResult r = AsyncWorkResult (TMVar (Awaitable r))
instance IsAwaitable r (AsyncWorkResult r) where
peekSTM (AsyncWorkResult var) = peekSTM =<< readTMVar var
completeWork :: AsyncWorkResult r -> Awaitable r -> IO ()
completeWork (AsyncWorkResult var) = atomically . putTMVar var
withPool :: (Pool -> IO a) -> IO a data PoolConfiguraiton = PoolConfiguraiton
withPool = undefined
withDefaultPool :: (Pool -> IO a) -> IO a defaultPoolConfiguration :: PoolConfiguraiton
withDefaultPool = (=<< atomically defaultPool) defaultPoolConfiguration = PoolConfiguraiton
defaultPool :: STM Pool withPool :: PoolConfiguraiton -> AsyncIO r -> IO r
defaultPool = do withPool configuration work = mask $ \unmask -> do
queue <- newTVar [] pool <- newPool configuration
pure Pool { task <- atomically $ newTask pool work
queue
result <- awaitAndThrowTo task unmask `finally` pure () -- TODO dispose pool
either throwIO pure result
where
awaitAndThrowTo :: AsyncTask r -> (forall a. IO a -> IO a) -> IO (Either SomeException r)
-- TODO handle asynchronous exceptions (stop pool)
awaitAndThrowTo task unmask = unmask (atomically (awaitSTM task)) `catchAll` (\ex -> undefined >> awaitAndThrowTo task unmask)
withDefaultPool :: AsyncIO a -> IO a
withDefaultPool = withPool defaultPoolConfiguration
newPool :: PoolConfiguraiton -> IO Pool
newPool configuration = do
queue <- newTVarIO mempty
threads <- newTVarIO mempty
let pool = Pool {
configuration,
queue,
threads
} }
void $ forkIO (managePool pool)
pure pool
where
managePool :: Pool -> IO ()
managePool pool = forever $ do
worker <- atomically $ takeWorkItem (toWorker pool) pool
void $ forkIO worker
workerSetup :: Pool -> IO () -> (forall a. IO a -> IO a) -> IO ()
workerSetup pool worker unmask = do
unmask worker
newTask :: Pool -> AsyncIO r -> STM (AsyncTask r)
newTask pool work = do
awaitable <- queueWork pool work
pure $ AsyncTask awaitable
queueWorkItem :: Pool -> AsyncWorkItem -> STM ()
queueWorkItem pool item = do
modifyTVar' (queue pool) (|> item)
queueWork :: Pool -> AsyncIO r -> STM (Awaitable r) queueWork :: Pool -> AsyncIO r -> STM (Awaitable r)
queueWork pool work = queueBlockedWork pool (successfulAwaitable ()) (const work) queueWork pool work = queueBlockedWork pool $ asyncCall (successfulAwaitable ()) (const work)
queueBlockedWork :: Pool -> Awaitable a -> (Either SomeException a -> AsyncIO r) -> STM (Awaitable r) queueBlockedWork :: Pool -> AsyncCall r -> STM (Awaitable r)
queueBlockedWork pool input work = do queueBlockedWork pool call = do
resultVar <- newAsyncVarSTM resultVar <- AsyncWorkResult <$> newEmptyTMVar
-- TODO masking state queueWorkItem pool $ AsyncWorkItem call (completeWork resultVar)
let actualWork = try . work >=> putAsyncVarEither_ resultVar
undefined
pure $ toAwaitable resultVar pure $ toAwaitable resultVar
toWorker :: Pool -> AsyncWorkItem -> STM (Maybe (IO ()))
toWorker pool (AsyncWorkItem (AsyncCall inputAwaitable work) putResult) = worker <<$>> peekSTM inputAwaitable
where
worker input = do
threadId <- myThreadId
atomically $ modifyTVar (threads pool) $ insert threadId
stepAsyncIO pool (work input) >>= \case
StepResultCompleted r -> putResult (successfulAwaitable r)
StepResultAwaitable x -> putResult x
-- This is an async tail call. Tail call optimization is performed by reusing `putResult`.
StepResultAsyncCall call -> atomically $ queueWorkItem pool (AsyncWorkItem call putResult)
atomically . modifyTVar (threads pool) . delete =<< myThreadId
takeWorkItem :: forall a. (AsyncWorkItem -> STM (Maybe a)) -> Pool -> STM a
takeWorkItem fn pool = do
items <- readTVar (queue pool)
(item, remaining) <- nextWorkItem Empty items
writeTVar (queue pool) remaining
pure item
where
nextWorkItem :: Seq AsyncWorkItem -> Seq AsyncWorkItem -> STM (a, Seq AsyncWorkItem)
nextWorkItem remaining (item :<| seq) = do
fn item >>= \case
Just work -> pure (work, remaining <> seq)
Nothing -> nextWorkItem (remaining |> item) seq
nextWorkItem _ _ = retry
-- * Awaiting multiple asyncs -- * Awaiting multiple asyncs
......
module Quasar.Observable.ObservableHashMapSpec (spec) where module Quasar.Observable.ObservableHashMapSpec (spec) where
import Quasar.Core
import Quasar.Disposable import Quasar.Disposable
import Quasar.Observable import Quasar.Observable
import Quasar.Observable.Delta import Quasar.Observable.Delta
......
module Quasar.Observable.ObservablePrioritySpec (spec) where module Quasar.Observable.ObservablePrioritySpec (spec) where
import Quasar.Core
import Quasar.Disposable import Quasar.Disposable
import Quasar.Observable import Quasar.Observable
import Quasar.Observable.ObservablePriority (ObservablePriority) import Quasar.Observable.ObservablePriority (ObservablePriority)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment