Skip to content
Snippets Groups Projects
Core.hs 5.19 KiB
module Quasar.Core (
  -- * AsyncIO
  AsyncIO,
  async,
  await,
  runAsyncIO,
  awaitResult,
) where

import Control.Concurrent (forkIOWithUnmask)
import Control.Concurrent.STM
import Control.Exception (MaskingState(..), getMaskingState)
import Control.Monad.Catch
import Data.Maybe (isJust)
import Data.Void (absurd)
import Quasar.Awaitable
import Quasar.Prelude


-- * AsyncIO

data AsyncIO r
  = AsyncIOCompleted (Either SomeException r)
  | AsyncIOIO (IO r)
  | AsyncIOAwait (Awaitable r)
  | forall a. AsyncIOBind (AsyncIO a) (a -> AsyncIO r)
  | forall e. Exception e => AsyncIOCatch (AsyncIO r) (e -> AsyncIO r)
  | r ~ Pool => AsyncIOAskPool

instance Functor AsyncIO where
  fmap fn (AsyncIOCompleted x) = AsyncIOCompleted (fn <$> x)
  fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x)
  fmap fn (AsyncIOAwait x) = AsyncIOAwait (fn <$> x)
  fmap fn (AsyncIOBind x y) = AsyncIOBind x (fn <<$>> y)
  fmap fn (AsyncIOCatch x y) = AsyncIOCatch (fn <$> x) (fn <<$>> y)
  fmap fn AsyncIOAskPool = AsyncIOBind AsyncIOAskPool (pure . fn)

instance Applicative AsyncIO where
  pure = AsyncIOCompleted . Right
  (<*>) pf px = pf >>= \f -> f <$> px
  liftA2 f px py = px >>= \x -> f x <$> py

instance Monad AsyncIO where
  (>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b
  x >>= fn = AsyncIOBind x fn

instance MonadIO AsyncIO where
  liftIO = AsyncIOIO

instance MonadThrow AsyncIO where
  throwM = AsyncIOCompleted . Left . toException

instance MonadCatch AsyncIO where
  catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a
  catch = AsyncIOCatch


-- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part.
async :: AsyncIO r -> AsyncIO (Awaitable r)
async (AsyncIOCompleted x) = pure $ completedAwaitable x
async (AsyncIOAwait x) = pure x
async x = do
  pool <- askPool
  liftIO . atomically $ queueWork pool x

askPool :: AsyncIO Pool
askPool = AsyncIOAskPool

await :: IsAwaitable r a => a -> AsyncIO r
await = AsyncIOAwait . toAwaitable

-- | Run an `AsyncIO` to completion and return the result.
runAsyncIO :: AsyncIO r -> IO r
runAsyncIO x = withDefaultPool $ \pool -> runAsyncIOWithPool pool x

runAsyncIOWithPool :: Pool -> AsyncIO r -> IO r
runAsyncIOWithPool pool x = do
  stepResult <- stepAsyncIO pool x
  case stepResult of
    Left awaitable -> either throwIO pure =<< atomically (awaitSTM awaitable)
    Right result -> pure result


stepAsyncIO :: Pool -> AsyncIO r -> IO (Either (Awaitable r) r)
stepAsyncIO pool = go
  where
    go :: AsyncIO r -> IO (Either (Awaitable r) r)
    go (AsyncIOCompleted x) = Right <$> either throwIO pure x
    go (AsyncIOIO x) = Right <$> x
    go (AsyncIOAwait x) = pure $ Left x
    go (AsyncIOBind x fn) = do
      go x >>= \case
        Left awaitable -> bindAwaitable awaitable (either throwM fn)
        Right r -> go (fn r)
    go (AsyncIOCatch x handler) = do
      try (go x) >>= \case
        Left ex -> go (handler ex)
        Right (Left awaitable) -> bindAwaitable awaitable (either (handleSomeException handler) pure)
        Right (Right r) -> pure $ Right r
    go AsyncIOAskPool = pure $ Right pool

    bindAwaitable :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> IO (Either (Awaitable r) r)
    bindAwaitable input work = fmap Left . atomically $ queueBlockedWork pool input work

handleSomeException :: (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a
handleSomeException handler ex = maybe (throwM ex) handler (fromException ex)

awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r
awaitResult = (await =<<)

-- TODO rename
-- AsyncIOPool
-- AsyncPool
-- ThreadPool
-- AsyncIORuntime
-- AsyncIOContext
data Pool = Pool {
  queue :: TVar [AsyncQueueItem]
}

data AsyncQueueItem = forall a. AsyncQueueItem (Awaitable a) (Either SomeException a -> AsyncIO ())

withPool :: (Pool -> IO a) -> IO a
withPool = undefined

withDefaultPool :: (Pool -> IO a) -> IO a
withDefaultPool = (=<< atomically defaultPool)

defaultPool :: STM Pool
defaultPool = do
  queue <- newTVar []
  pure Pool {
    queue
  }

queueWork :: Pool -> AsyncIO r -> STM (Awaitable r)
queueWork pool work = queueBlockedWork pool (successfulAwaitable ()) (const work)

queueBlockedWork :: Pool -> Awaitable a -> (Either SomeException a -> AsyncIO r) -> STM (Awaitable r)
queueBlockedWork pool input work = do
  resultVar <- newAsyncVarSTM
  -- TODO masking state
  let actualWork = try . work >=> putAsyncVarEither_ resultVar
  undefined
  pure $ toAwaitable resultVar


-- * Awaiting multiple asyncs

awaitEither :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> AsyncIO (Either ra rb)
awaitEither x y = await =<< liftIO (awaitEitherPlumbing x y)

awaitEitherPlumbing :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> IO (Awaitable (Either ra rb))
awaitEitherPlumbing x y = awaitableFromSTM $ peekEitherSTM x y

peekEitherSTM :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> STM (Maybe (Either SomeException (Either ra rb)))
peekEitherSTM x y =
  peekSTM x >>= \case
    Just (Left ex) -> pure (Just (Left ex))
    Just (Right r) -> pure (Just (Right (Left r)))
    Nothing -> peekSTM y >>= \case
      Just (Left ex) -> pure (Just (Left ex))
      Just (Right r) -> pure (Just (Right (Right r)))
      Nothing -> pure Nothing