Introduce stepAsyncIO to simplify AsyncIO control flow

Co-authored-by: default avatarJan Beinke <>
parent 73e448b1
...@@ -23,120 +23,135 @@ import Quasar.Prelude ...@@ -23,120 +23,135 @@ import Quasar.Prelude
-- * AsyncIO -- * AsyncIO
data AsyncIO r data AsyncIO r
= AsyncIOSuccess r = AsyncIOCompleted (Either SomeException r)
| AsyncIOFailure SomeException
| AsyncIOIO (IO r) | AsyncIOIO (IO r)
| AsyncIOAsync (Awaitable r) | AsyncIOAwait (Awaitable r)
| AsyncIOPlumbing (MaskingState -> CancellationToken -> IO (AsyncIO r)) | forall a. AsyncIOBind (AsyncIO a) (a -> AsyncIO r)
| forall e. Exception e => AsyncIOCatch (AsyncIO r) (e -> AsyncIO r)
| r ~ Pool => AsyncIOAskPool
instance Functor AsyncIO where instance Functor AsyncIO where
fmap fn (AsyncIOSuccess x) = AsyncIOSuccess (fn x) fmap fn (AsyncIOCompleted x) = AsyncIOCompleted (fn <$> x)
fmap _ (AsyncIOFailure x) = AsyncIOFailure x
fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x) fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x)
fmap fn (AsyncIOAsync x) = AsyncIOAsync (fn <$> x) fmap fn (AsyncIOAwait x) = AsyncIOAwait (fn <$> x)
fmap fn (AsyncIOPlumbing x) = mapPlumbing x (fmap (fmap fn)) fmap fn (AsyncIOBind x y) = AsyncIOBind x (fn <<$>> y)
fmap fn (AsyncIOCatch x y) = AsyncIOCatch (fn <$> x) (fn <<$>> y)
fmap fn AsyncIOAskPool = AsyncIOBind AsyncIOAskPool (pure . fn)
instance Applicative AsyncIO where instance Applicative AsyncIO where
pure = AsyncIOSuccess pure = AsyncIOCompleted . Right
(<*>) pf px = pf >>= \f -> f <$> px (<*>) pf px = pf >>= \f -> f <$> px
liftA2 f px py = px >>= \x -> f x <$> py liftA2 f px py = px >>= \x -> f x <$> py
instance Monad AsyncIO where instance Monad AsyncIO where
(>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b (>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b
(>>=) (AsyncIOSuccess x) fn = fn x x >>= fn = AsyncIOBind x fn
(>>=) (AsyncIOFailure x) _ = AsyncIOFailure x
(>>=) (AsyncIOIO x) fn = AsyncIOPlumbing $ \maskingState cancellationToken -> do
-- TODO masking and cancellation
either AsyncIOFailure fn <$> try x
(>>=) (AsyncIOAsync x) fn = bindAsync x fn
(>>=) (AsyncIOPlumbing x) fn = mapPlumbing x (fmap (>>= fn))
instance MonadIO AsyncIO where instance MonadIO AsyncIO where
liftIO = AsyncIOIO liftIO = AsyncIOIO
instance MonadThrow AsyncIO where instance MonadThrow AsyncIO where
throwM = AsyncIOFailure . toException throwM = AsyncIOCompleted . Left . toException
instance MonadCatch AsyncIO where instance MonadCatch AsyncIO where
catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a
catch x@(AsyncIOSuccess _) _ = x catch = AsyncIOCatch
catch x@(AsyncIOFailure ex) handler = maybe x handler (fromException ex)
catch (AsyncIOIO x) handler = AsyncIOIO (try x) >>= handleEither handler
catch (AsyncIOAsync x) handler = bindAsyncCatch x (handleEither handler)
catch (AsyncIOPlumbing x) handler = mapPlumbing x (fmap (`catch` handler))
handleEither :: Exception e => (e -> AsyncIO a) -> Either SomeException a -> AsyncIO a
handleEither handler (Left ex) = maybe (AsyncIOFailure ex) handler (fromException ex)
handleEither _ (Right r) = pure r
mapPlumbing :: (MaskingState -> CancellationToken -> IO (AsyncIO a)) -> (IO (AsyncIO a) -> IO (AsyncIO b)) -> AsyncIO b
mapPlumbing plumbing fn = AsyncIOPlumbing $ \maskingState cancellationToken -> fn (plumbing maskingState cancellationToken)
bindAsync :: forall a b. Awaitable a -> (a -> AsyncIO b) -> AsyncIO b
bindAsync x fn = bindAsyncCatch x (either AsyncIOFailure fn)
bindAsyncCatch :: forall a b. Awaitable a -> (Either SomeException a -> AsyncIO b) -> AsyncIO b
bindAsyncCatch x fn = undefined -- AsyncIOPlumbing $ \maskingState cancellationToken -> do
--var <- newAsyncVar
--disposableMVar <- newEmptyMVar
--go maskingState cancellationToken var disposableMVar
-- go maskingState cancellationToken var disposableMVar = do
-- disposable <- onResult x (failAsyncVar_ var) $ \x -> do
-- (putAsyncIOResult . fn) x
-- -- TODO update mvar and dispose when completed
-- putMVar disposableMVar disposable
-- pure $ awaitUnlessCancellationRequested cancellationToken var
-- where
-- put = putAsyncVarEither var
-- putAsyncIOResult :: AsyncIO b -> IO ()
-- putAsyncIOResult (AsyncIOSuccess x) = put (Right x)
-- putAsyncIOResult (AsyncIOFailure x) = put (Left x)
-- putAsyncIOResult (AsyncIOIO x) = try x >>= put
-- putAsyncIOResult (AsyncIOAsync x) = onResult_ x (put . Left) put
-- putAsyncIOResult (AsyncIOPlumbing x) = x maskingState cancellationToken >>= putAsyncIOResult
-- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part.
async :: AsyncIO r -> AsyncIO (Awaitable r) async :: AsyncIO r -> AsyncIO (Awaitable r)
async (AsyncIOSuccess x) = pure $ successfulAwaitable x async (AsyncIOCompleted x) = pure $ completedAwaitable x
async (AsyncIOFailure x) = pure $ failedAwaitable x async (AsyncIOAwait x) = pure x
async (AsyncIOIO x) = liftIO $ either failedAwaitable successfulAwaitable <$> try x async x = do
async (AsyncIOAsync x) = pure x -- TODO caching pool <- askPool
async (AsyncIOPlumbing x) = mapPlumbing x (fmap async) liftIO . atomically $ queueWork pool x
askPool :: AsyncIO Pool
askPool = AsyncIOAskPool
await :: IsAwaitable r a => a -> AsyncIO r await :: IsAwaitable r a => a -> AsyncIO r
await = AsyncIOAsync . toAwaitable await = AsyncIOAwait . toAwaitable
-- | Run an `AsyncIO` to completion and return the result. -- | Run an `AsyncIO` to completion and return the result.
runAsyncIO :: AsyncIO r -> IO r runAsyncIO :: AsyncIO r -> IO r
runAsyncIO (AsyncIOSuccess x) = pure x runAsyncIO x = withDefaultPool $ \pool -> runAsyncIOWithPool pool x
runAsyncIO (AsyncIOFailure x) = throwIO x
runAsyncIO (AsyncIOIO x) = x runAsyncIOWithPool :: Pool -> AsyncIO r -> IO r
runAsyncIO (AsyncIOAsync x) = either throwIO pure =<< atomically (awaitSTM x) runAsyncIOWithPool pool x = do
runAsyncIO (AsyncIOPlumbing x) = do stepResult <- stepAsyncIO pool x
maskingState <- getMaskingState case stepResult of
withCancellationToken $ x maskingState >=> runAsyncIO Left awaitable -> either throwIO pure =<< atomically (awaitSTM awaitable)
Right result -> pure result
stepAsyncIO :: Pool -> AsyncIO r -> IO (Either (Awaitable r) r)
stepAsyncIO pool = go
go :: AsyncIO r -> IO (Either (Awaitable r) r)
go (AsyncIOCompleted x) = Right <$> either throwIO pure x
go (AsyncIOIO x) = Right <$> x
go (AsyncIOAwait x) = pure $ Left x
go (AsyncIOBind x fn) = do
go x >>= \case
Left awaitable -> bindAwaitable awaitable (either throwM fn)
Right r -> go (fn r)
go (AsyncIOCatch x handler) = do
try (go x) >>= \case
Left ex -> go (handler ex)
Right (Left awaitable) -> bindAwaitable awaitable (either (handleSomeException handler) pure)
Right (Right r) -> pure $ Right r
go AsyncIOAskPool = pure $ Right pool
bindAwaitable :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> IO (Either (Awaitable r) r)
bindAwaitable input work = fmap Left . atomically $ queueBlockedWork pool input work
handleSomeException :: (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a
handleSomeException handler ex = maybe (throwM ex) handler (fromException ex)
awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r
awaitResult = (await =<<) awaitResult = (await =<<)
-- TODO rename
-- AsyncIOPool
-- AsyncPool
-- ThreadPool
-- AsyncIORuntime
-- AsyncIOContext
data Pool = Pool {
queue :: TVar [AsyncQueueItem]
data AsyncQueueItem = forall a. AsyncQueueItem (Awaitable a) (Either SomeException a -> AsyncIO ())
withPool :: (Pool -> IO a) -> IO a
withPool = undefined
-- ** Forking asyncs withDefaultPool :: (Pool -> IO a) -> IO a
withDefaultPool = (=<< atomically defaultPool)
-- TODO defaultPool :: STM Pool
--class IsAsyncForkable m where defaultPool = do
-- asyncThread :: m r -> AsyncIO r queue <- newTVar []
pure Pool {
queueWork :: Pool -> AsyncIO r -> STM (Awaitable r)
queueWork pool work = queueBlockedWork pool (successfulAwaitable ()) (const work)
queueBlockedWork :: Pool -> Awaitable a -> (Either SomeException a -> AsyncIO r) -> STM (Awaitable r)
queueBlockedWork pool input work = do
resultVar <- newAsyncVarSTM
-- TODO masking state
let actualWork = try . work >=> putAsyncVarEither_ resultVar
pure $ toAwaitable resultVar
-- * Awaiting multiple asyncs -- * Awaiting multiple asyncs
awaitEither :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> AsyncIO (Either ra rb) awaitEither :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> AsyncIO (Either ra rb)
awaitEither x y = AsyncIOPlumbing $ \_ _ -> AsyncIOAsync <$> awaitEitherPlumbing x y awaitEither x y = await =<< liftIO (awaitEitherPlumbing x y)
awaitEitherPlumbing :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> IO (Awaitable (Either ra rb)) awaitEitherPlumbing :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> IO (Awaitable (Either ra rb))
awaitEitherPlumbing x y = awaitableFromSTM $ peekEitherSTM x y awaitEitherPlumbing x y = awaitableFromSTM $ peekEitherSTM x y
