From 1d10e90efb7a7856bb8214aa02869c4b7fbcec90 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Mon, 26 Jul 2021 00:33:07 +0200
Subject: [PATCH] Introduce stepAsyncIO to simplify AsyncIO control flow

Co-authored-by: Jan Beinke <git@janbeinke.com>
---
 src/Quasar/Core.hs | 159 +++++++++++++++++++++++++--------------------
 1 file changed, 87 insertions(+), 72 deletions(-)

diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs
index b5e56e4..4d44ba6 100644
--- a/src/Quasar/Core.hs
+++ b/src/Quasar/Core.hs
@@ -23,120 +23,135 @@ import Quasar.Prelude
 -- * AsyncIO
 
 data AsyncIO r
-  = AsyncIOSuccess r
-  | AsyncIOFailure SomeException
+  = AsyncIOCompleted (Either SomeException r)
   | AsyncIOIO (IO r)
-  | AsyncIOAsync (Awaitable r)
-  | AsyncIOPlumbing (MaskingState -> CancellationToken -> IO (AsyncIO r))
+  | AsyncIOAwait (Awaitable r)
+  | forall a. AsyncIOBind (AsyncIO a) (a -> AsyncIO r)
+  | forall e. Exception e => AsyncIOCatch (AsyncIO r) (e -> AsyncIO r)
+  | r ~ Pool => AsyncIOAskPool
 
 instance Functor AsyncIO where
-  fmap fn (AsyncIOSuccess x) = AsyncIOSuccess (fn x)
-  fmap _ (AsyncIOFailure x) = AsyncIOFailure x
+  fmap fn (AsyncIOCompleted x) = AsyncIOCompleted (fn <$> x)
   fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x)
-  fmap fn (AsyncIOAsync x) = AsyncIOAsync (fn <$> x)
-  fmap fn (AsyncIOPlumbing x) = mapPlumbing x (fmap (fmap fn))
+  fmap fn (AsyncIOAwait x) = AsyncIOAwait (fn <$> x)
+  fmap fn (AsyncIOBind x y) = AsyncIOBind x (fn <<$>> y)
+  fmap fn (AsyncIOCatch x y) = AsyncIOCatch (fn <$> x) (fn <<$>> y)
+  fmap fn AsyncIOAskPool = AsyncIOBind AsyncIOAskPool (pure . fn)
 
 instance Applicative AsyncIO where
-  pure = AsyncIOSuccess
+  pure = AsyncIOCompleted . Right
   (<*>) pf px = pf >>= \f -> f <$> px
   liftA2 f px py = px >>= \x -> f x <$> py
 
 instance Monad AsyncIO where
   (>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b
-  (>>=) (AsyncIOSuccess x) fn = fn x
-  (>>=) (AsyncIOFailure x) _ = AsyncIOFailure x
-  (>>=) (AsyncIOIO x) fn = AsyncIOPlumbing $ \maskingState cancellationToken -> do
-    -- TODO masking and cancellation
-    either AsyncIOFailure fn <$> try x
-  (>>=) (AsyncIOAsync x) fn = bindAsync x fn
-  (>>=) (AsyncIOPlumbing x) fn = mapPlumbing x (fmap (>>= fn))
+  x >>= fn = AsyncIOBind x fn
 
 instance MonadIO AsyncIO where
   liftIO = AsyncIOIO
 
 instance MonadThrow AsyncIO where
-  throwM = AsyncIOFailure . toException
+  throwM = AsyncIOCompleted . Left . toException
 
 instance MonadCatch AsyncIO where
   catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a
-  catch x@(AsyncIOSuccess _) _ = x
-  catch x@(AsyncIOFailure ex) handler = maybe x handler (fromException ex)
-  catch (AsyncIOIO x) handler = AsyncIOIO (try x) >>= handleEither handler
-  catch (AsyncIOAsync x) handler = bindAsyncCatch x (handleEither handler)
-  catch (AsyncIOPlumbing x) handler = mapPlumbing x (fmap (`catch` handler))
-
-handleEither :: Exception e => (e -> AsyncIO a) -> Either SomeException a -> AsyncIO a
-handleEither handler (Left ex) = maybe (AsyncIOFailure ex) handler (fromException ex)
-handleEither _ (Right r) = pure r
-
-mapPlumbing :: (MaskingState -> CancellationToken -> IO (AsyncIO a)) -> (IO (AsyncIO a) -> IO (AsyncIO b)) -> AsyncIO b
-mapPlumbing plumbing fn = AsyncIOPlumbing $ \maskingState cancellationToken -> fn (plumbing maskingState cancellationToken)
-
-bindAsync :: forall a b. Awaitable a -> (a -> AsyncIO b) -> AsyncIO b
-bindAsync x fn = bindAsyncCatch x (either AsyncIOFailure fn)
-
-bindAsyncCatch :: forall a b. Awaitable a -> (Either SomeException a -> AsyncIO b) -> AsyncIO b
-bindAsyncCatch x fn = undefined -- AsyncIOPlumbing $ \maskingState cancellationToken -> do
-  --var <- newAsyncVar
-  --disposableMVar <- newEmptyMVar
-  --go maskingState cancellationToken var disposableMVar
-  --where
-  --  go maskingState cancellationToken var disposableMVar = do
-  --    disposable <- onResult x (failAsyncVar_ var) $ \x -> do
-  --      (putAsyncIOResult . fn) x
-  --    -- TODO update mvar and dispose when completed
-  --    putMVar disposableMVar disposable
-  --    pure $ awaitUnlessCancellationRequested cancellationToken var
-  --    where
-  --      put = putAsyncVarEither var
-  --      putAsyncIOResult :: AsyncIO b -> IO ()
-  --      putAsyncIOResult (AsyncIOSuccess x) = put (Right x)
-  --      putAsyncIOResult (AsyncIOFailure x) = put (Left x)
-  --      putAsyncIOResult (AsyncIOIO x) = try x >>= put
-  --      putAsyncIOResult (AsyncIOAsync x) = onResult_ x (put . Left) put
-  --      putAsyncIOResult (AsyncIOPlumbing x) = x maskingState cancellationToken >>= putAsyncIOResult
-
+  catch = AsyncIOCatch
 
 
 -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part.
 async :: AsyncIO r -> AsyncIO (Awaitable r)
-async (AsyncIOSuccess x) = pure $ successfulAwaitable x
-async (AsyncIOFailure x) = pure $ failedAwaitable x
-async (AsyncIOIO x) = liftIO $ either failedAwaitable successfulAwaitable <$> try x
-async (AsyncIOAsync x) = pure x -- TODO caching
-async (AsyncIOPlumbing x) = mapPlumbing x (fmap async)
+async (AsyncIOCompleted x) = pure $ completedAwaitable x
+async (AsyncIOAwait x) = pure x
+async x = do
+  pool <- askPool
+  liftIO . atomically $ queueWork pool x
+
+askPool :: AsyncIO Pool
+askPool = AsyncIOAskPool
 
 await :: IsAwaitable r a => a -> AsyncIO r
-await = AsyncIOAsync . toAwaitable
+await = AsyncIOAwait . toAwaitable
 
 -- | Run an `AsyncIO` to completion and return the result.
 runAsyncIO :: AsyncIO r -> IO r
-runAsyncIO (AsyncIOSuccess x) = pure x
-runAsyncIO (AsyncIOFailure x) = throwIO x
-runAsyncIO (AsyncIOIO x) = x
-runAsyncIO (AsyncIOAsync x) = either throwIO pure =<< atomically (awaitSTM x)
-runAsyncIO (AsyncIOPlumbing x) = do
-  maskingState <- getMaskingState
-  withCancellationToken $ x maskingState >=> runAsyncIO
+runAsyncIO x = withDefaultPool $ \pool -> runAsyncIOWithPool pool x
+
+runAsyncIOWithPool :: Pool -> AsyncIO r -> IO r
+runAsyncIOWithPool pool x = do
+  stepResult <- stepAsyncIO pool x
+  case stepResult of
+    Left awaitable -> either throwIO pure =<< atomically (awaitSTM awaitable)
+    Right result -> pure result
+
+
+stepAsyncIO :: Pool -> AsyncIO r -> IO (Either (Awaitable r) r)
+stepAsyncIO pool = go
+  where
+    go :: AsyncIO r -> IO (Either (Awaitable r) r)
+    go (AsyncIOCompleted x) = Right <$> either throwIO pure x
+    go (AsyncIOIO x) = Right <$> x
+    go (AsyncIOAwait x) = pure $ Left x
+    go (AsyncIOBind x fn) = do
+      go x >>= \case
+        Left awaitable -> bindAwaitable awaitable (either throwM fn)
+        Right r -> go (fn r)
+    go (AsyncIOCatch x handler) = do
+      try (go x) >>= \case
+        Left ex -> go (handler ex)
+        Right (Left awaitable) -> bindAwaitable awaitable (either (handleSomeException handler) pure)
+        Right (Right r) -> pure $ Right r
+    go AsyncIOAskPool = pure $ Right pool
+
+    bindAwaitable :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> IO (Either (Awaitable r) r)
+    bindAwaitable input work = fmap Left . atomically $ queueBlockedWork pool input work
+
+handleSomeException :: (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a
+handleSomeException handler ex = maybe (throwM ex) handler (fromException ex)
 
 awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r
 awaitResult = (await =<<)
 
+-- TODO rename
+-- AsyncIOPool
+-- AsyncPool
+-- ThreadPool
+-- AsyncIORuntime
+-- AsyncIOContext
+data Pool = Pool {
+  queue :: TVar [AsyncQueueItem]
+}
+
+data AsyncQueueItem = forall a. AsyncQueueItem (Awaitable a) (Either SomeException a -> AsyncIO ())
 
+withPool :: (Pool -> IO a) -> IO a
+withPool = undefined
 
--- ** Forking asyncs
+withDefaultPool :: (Pool -> IO a) -> IO a
+withDefaultPool = (=<< atomically defaultPool)
 
--- TODO
---class IsAsyncForkable m where
---  asyncThread :: m r -> AsyncIO r
+defaultPool :: STM Pool
+defaultPool = do
+  queue <- newTVar []
+  pure Pool {
+    queue
+  }
 
+queueWork :: Pool -> AsyncIO r -> STM (Awaitable r)
+queueWork pool work = queueBlockedWork pool (successfulAwaitable ()) (const work)
 
+queueBlockedWork :: Pool -> Awaitable a -> (Either SomeException a -> AsyncIO r) -> STM (Awaitable r)
+queueBlockedWork pool input work = do
+  resultVar <- newAsyncVarSTM
+  -- TODO masking state
+  let actualWork = try . work >=> putAsyncVarEither_ resultVar
+  undefined
+  pure $ toAwaitable resultVar
 
 
 -- * Awaiting multiple asyncs
 
 awaitEither :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> AsyncIO (Either ra rb)
-awaitEither x y = AsyncIOPlumbing $ \_ _ -> AsyncIOAsync <$> awaitEitherPlumbing x y
+awaitEither x y = await =<< liftIO (awaitEitherPlumbing x y)
 
 awaitEitherPlumbing :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> IO (Awaitable (Either ra rb))
 awaitEitherPlumbing x y = awaitableFromSTM $ peekEitherSTM x y
-- 
GitLab