Skip to content
Snippets Groups Projects
Core.hs 9.43 KiB
module Quasar.Core (
  -- * Async
  IsAsync(..),
  Async,
  successfulAsync,
  failedAsync,
  completedAsync,

  -- * AsyncIO
  AsyncIO,
  async,
  await,
  runAsyncIO,
  awaitResult,

  -- * Async helpers
  mapAsync,

  -- * AsyncVar
  AsyncVar,
  newAsyncVar,
  putAsyncVar,

  -- * Disposable
  IsDisposable(..),
  disposeIO,
  Disposable,
  mkDisposable,
  synchronousDisposable,
  noDisposable,
) where

import Control.Monad.Catch
import Data.HashMap.Strict qualified as HM
import Quasar.Prelude

-- * Async

class IsAsync r a | a -> r where
  -- | Wait until the promise is settled and return the result.
  wait :: a -> IO r
  wait x = do
    mvar <- newEmptyMVar
    onResult_ x (void . tryPutMVar mvar . Left) (resultCallback mvar)
    readMVar mvar >>= either throwIO pure
    where
      resultCallback :: MVar (Either SomeException r) -> Either SomeException r -> IO ()
      resultCallback mvar result = do
        success <- tryPutMVar mvar result
        unless success $ fail "Callback was called multiple times"

  peekAsync :: a -> IO (Maybe (Either SomeException r))

  -- | Register a callback, that will be called once the promise is settled.
  -- If the promise is already settled, the callback will be called immediately instead.
  --
  -- The returned `Disposable` can be used to deregister the callback.
  onResult
    :: a
    -- ^ async
    -> (SomeException -> IO ())
    -- ^ callback exception handler
    -> (Either SomeException r -> IO ())
    -- ^ callback
    -> IO Disposable

  onResult_
    :: a
    -> (SomeException -> IO ())
    -> (Either SomeException r -> IO ())
    -> IO ()
  onResult_ x y = void . onResult x y

  toAsync :: a -> Async r
  toAsync = SomeAsync


data Async r = forall a. IsAsync r a => SomeAsync a

instance IsAsync r (Async r) where
  wait (SomeAsync x) = wait x
  onResult (SomeAsync x) y = onResult x y
  onResult_ (SomeAsync x) y = onResult_ x y
  peekAsync (SomeAsync x) = peekAsync x



newtype CompletedAsync r = CompletedAsync (Either SomeException r)
instance IsAsync r (CompletedAsync r) where
  wait (CompletedAsync value) = either throwIO pure value
  onResult (CompletedAsync value) callbackExceptionHandler callback = noDisposable <$ (callback value `catch` callbackExceptionHandler)
  peekAsync (CompletedAsync value) = pure $ Just value

completedAsync :: Either SomeException r -> Async r
completedAsync = toAsync . CompletedAsync

successfulAsync :: r -> Async r
successfulAsync = completedAsync . Right

failedAsync :: SomeException -> Async r
failedAsync = completedAsync . Left


-- * AsyncIO

data AsyncIO r
  = AsyncIOSuccess r
  | AsyncIOFailure SomeException
  | AsyncIOIO (IO r)
  | AsyncIOAsync (Async r)
  | AsyncIOPlumbing (IO (AsyncIO r))

instance Functor AsyncIO where
  fmap fn (AsyncIOSuccess x) = AsyncIOSuccess (fn x)
  fmap _ (AsyncIOFailure x) = AsyncIOFailure x
  fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x)
  fmap fn (AsyncIOAsync x) = bindAsync x (pure . fn)
  fmap fn (AsyncIOPlumbing x) = AsyncIOPlumbing (fn <<$>> x)

instance Applicative AsyncIO where
  pure = AsyncIOSuccess
  (<*>) pf px = pf >>= \f -> f <$> px
  liftA2 f px py = px >>= \x -> f x <$> py
instance Monad AsyncIO where
  (>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b
  (>>=) (AsyncIOSuccess x) fn = fn x
  (>>=) (AsyncIOFailure x) _ = AsyncIOFailure x
  (>>=) (AsyncIOIO x) fn = AsyncIOPlumbing $ either AsyncIOFailure fn <$> try x
  (>>=) (AsyncIOAsync x) fn = bindAsync x fn
  (>>=) (AsyncIOPlumbing x) fn = AsyncIOPlumbing $ (>>= fn) <$> x

instance MonadIO AsyncIO where
  liftIO = AsyncIOIO

instance MonadThrow AsyncIO where
  throwM = AsyncIOFailure . toException

instance MonadCatch AsyncIO where
  catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a
  catch x@(AsyncIOSuccess _) _ = x
  catch x@(AsyncIOFailure ex) handler = maybe x handler (fromException ex)
  catch (AsyncIOIO x) handler = AsyncIOIO (try x) >>= handleEither handler
  catch (AsyncIOAsync x) handler = bindAsyncCatch x (handleEither handler)
  catch (AsyncIOPlumbing x) handler = AsyncIOPlumbing $ x >>= pure . (`catch` handler)

handleEither :: Exception e => (e -> AsyncIO a) -> Either SomeException a -> AsyncIO a
handleEither handler (Left ex) = maybe (AsyncIOFailure ex) handler (fromException ex)
handleEither _ (Right r) = pure r

bindAsync :: forall a b. Async a -> (a -> AsyncIO b) -> AsyncIO b
bindAsync x fn = bindAsyncCatch x (either (AsyncIOFailure) fn)

bindAsyncCatch :: forall a b. Async a -> (Either SomeException a -> AsyncIO b) -> AsyncIO b
bindAsyncCatch x fn = AsyncIOPlumbing $ newAsyncVar >>= bindAsync'
  where
    bindAsync' resultVar = do
      withResult x resultVar step
      pure $ await resultVar
    step :: (Either SomeException b -> IO ()) -> Either SomeException a -> IO ()
    step put = putAsyncIOResult put . fn

withResult :: Async a -> AsyncVar b -> ((Either SomeException b -> IO ()) -> Either SomeException a -> IO ()) -> IO ()
withResult x var fn = onResult_ x (failAsyncVar var) (fn (putAsyncVarEither var))

putAsyncIOResult :: (Either SomeException a -> IO ()) -> AsyncIO a -> IO ()
putAsyncIOResult put (AsyncIOSuccess x) = put (Right x)
putAsyncIOResult put (AsyncIOFailure x) = put (Left x)
putAsyncIOResult put (AsyncIOIO x) = try x >>= put
putAsyncIOResult put (AsyncIOAsync x) = onResult_ x (put . Left) put
putAsyncIOResult put (AsyncIOPlumbing x) = x >>= putAsyncIOResult put



-- | Run the synchronous part of an `AsyncIO` and then return an `Async` that can be used to wait for completion of the synchronous part.
async :: AsyncIO r -> AsyncIO (Async r)
async = fmap successfulAsync

await :: IsAsync r a => a -> AsyncIO r
await = AsyncIOAsync . toAsync

-- | Run an `AsyncIO` to completion and return the result.
runAsyncIO :: AsyncIO r -> IO r
runAsyncIO (AsyncIOSuccess x) = pure x
runAsyncIO (AsyncIOFailure x) = throwIO x
runAsyncIO (AsyncIOIO x) = x
runAsyncIO (AsyncIOAsync x) = wait x
runAsyncIO (AsyncIOPlumbing x) = x >>= runAsyncIO -- TODO error handling

awaitResult :: AsyncIO (Async r) -> AsyncIO r
awaitResult = (await =<<)


mapAsync :: (a -> b) -> Async a -> AsyncIO (Async b)
-- FIXME: don't actually attach a function if the resulting async is not used
-- maybe use `Weak`? When `Async b` is GC'ed, the handler is detached from `Async a`
mapAsync fn = async . fmap fn . await


-- ** Forking asyncs

-- TODO
--class IsAsyncForkable m where
--  asyncThread :: m r -> AsyncIO r


-- * Async helpers

-- ** AsyncVar

-- | The default implementation for a `Async` that can be fulfilled later.
newtype AsyncVar r = AsyncVar (MVar (AsyncVarState r))
data AsyncVarState r = AsyncVarCompleted (Either SomeException r) | AsyncVarOpen (HM.HashMap Unique (Either SomeException r -> IO (), SomeException -> IO ()))

instance IsAsync r (AsyncVar r) where
  peekAsync :: AsyncVar r -> IO (Maybe (Either SomeException r))
  peekAsync (AsyncVar mvar) = readMVar mvar >>= pure . \case
    AsyncVarCompleted x -> Just x
    AsyncVarOpen _ -> Nothing

  onResult :: AsyncVar r -> (SomeException -> IO ()) -> (Either SomeException r -> IO ()) -> IO Disposable
  onResult (AsyncVar mvar) callbackExceptionHandler callback =
    modifyMVar mvar $ \case
      AsyncVarOpen callbacks -> do
        key <- newUnique
        pure (AsyncVarOpen (HM.insert key (callback, callbackExceptionHandler) callbacks), removeHandler key)
      x@(AsyncVarCompleted value) -> (x, noDisposable) <$ callback value
    where
      removeHandler :: Unique -> Disposable
      removeHandler key = synchronousDisposable $ modifyMVar_ mvar $ pure . \case
        x@(AsyncVarCompleted _) -> x
        AsyncVarOpen x -> AsyncVarOpen $ HM.delete key x


newAsyncVar :: MonadIO m => m (AsyncVar r)
newAsyncVar = liftIO $ AsyncVar <$> newMVar (AsyncVarOpen HM.empty)

putAsyncVar :: MonadIO m => AsyncVar a -> a -> m ()
putAsyncVar var = putAsyncVarEither var . Right

failAsyncVar :: MonadIO m => AsyncVar a -> SomeException -> m ()
failAsyncVar var = putAsyncVarEither var . Left

putAsyncVarEither :: MonadIO m => AsyncVar a -> Either SomeException a -> m ()
putAsyncVarEither (AsyncVar mvar) value = liftIO $ do
  mask $ \restore -> do
    takeMVar mvar >>= \case
      x@(AsyncVarCompleted _) -> do
        putMVar mvar x
        fail "An AsyncVar can only be fulfilled once"
      AsyncVarOpen callbacksMap -> do
        let callbacks = HM.elems callbacksMap
        -- NOTE disposing a callback while it is called is a deadlock
        forM_ callbacks $ \(callback, callbackExceptionHandler) ->
          restore (callback value) `catch` callbackExceptionHandler
        putMVar mvar (AsyncVarCompleted value)


-- * Disposable

class IsDisposable a where
  -- TODO document laws: must not throw exceptions, is idempotent

  -- | Dispose a resource.
  dispose :: a -> AsyncIO ()

  toDisposable :: a -> Disposable
  toDisposable = mkDisposable . dispose

-- | Dispose a resource in the IO monad.
disposeIO :: IsDisposable a => a -> IO ()
disposeIO = runAsyncIO . dispose

instance IsDisposable a => IsDisposable (Maybe a) where
  dispose = mapM_ dispose


newtype Disposable = Disposable (AsyncIO ())

instance IsDisposable Disposable where
  dispose (Disposable fn) = fn
  toDisposable = id

instance Semigroup Disposable where
  x <> y = mkDisposable $ liftA2 (<>) (dispose x) (dispose y)

instance Monoid Disposable where
  mempty = mkDisposable $ pure ()
  mconcat disposables = mkDisposable $ traverse_ dispose disposables


mkDisposable :: AsyncIO () -> Disposable
mkDisposable = Disposable

synchronousDisposable :: IO () -> Disposable
synchronousDisposable = mkDisposable . liftIO

noDisposable :: Disposable
noDisposable = mempty