-
Jens Nolte authored
Co-authored-by:
Jan Beinke <git@janbeinke.com>
Jens Nolte authoredCo-authored-by:
Jan Beinke <git@janbeinke.com>
Core.hs 9.43 KiB
module Quasar.Core (
-- * Async
IsAsync(..),
Async,
successfulAsync,
failedAsync,
completedAsync,
-- * AsyncIO
AsyncIO,
async,
await,
runAsyncIO,
awaitResult,
-- * Async helpers
mapAsync,
-- * AsyncVar
AsyncVar,
newAsyncVar,
putAsyncVar,
-- * Disposable
IsDisposable(..),
disposeIO,
Disposable,
mkDisposable,
synchronousDisposable,
noDisposable,
) where
import Control.Monad.Catch
import Data.HashMap.Strict qualified as HM
import Quasar.Prelude
-- * Async
class IsAsync r a | a -> r where
-- | Wait until the promise is settled and return the result.
wait :: a -> IO r
wait x = do
mvar <- newEmptyMVar
onResult_ x (void . tryPutMVar mvar . Left) (resultCallback mvar)
readMVar mvar >>= either throwIO pure
where
resultCallback :: MVar (Either SomeException r) -> Either SomeException r -> IO ()
resultCallback mvar result = do
success <- tryPutMVar mvar result
unless success $ fail "Callback was called multiple times"
peekAsync :: a -> IO (Maybe (Either SomeException r))
-- | Register a callback, that will be called once the promise is settled.
-- If the promise is already settled, the callback will be called immediately instead.
--
-- The returned `Disposable` can be used to deregister the callback.
onResult
:: a
-- ^ async
-> (SomeException -> IO ())
-- ^ callback exception handler
-> (Either SomeException r -> IO ())
-- ^ callback
-> IO Disposable
onResult_
:: a
-> (SomeException -> IO ())
-> (Either SomeException r -> IO ())
-> IO ()
onResult_ x y = void . onResult x y
toAsync :: a -> Async r
toAsync = SomeAsync
data Async r = forall a. IsAsync r a => SomeAsync a
instance IsAsync r (Async r) where
wait (SomeAsync x) = wait x
onResult (SomeAsync x) y = onResult x y
onResult_ (SomeAsync x) y = onResult_ x y
peekAsync (SomeAsync x) = peekAsync x
newtype CompletedAsync r = CompletedAsync (Either SomeException r)
instance IsAsync r (CompletedAsync r) where
wait (CompletedAsync value) = either throwIO pure value
onResult (CompletedAsync value) callbackExceptionHandler callback = noDisposable <$ (callback value `catch` callbackExceptionHandler)
peekAsync (CompletedAsync value) = pure $ Just value
completedAsync :: Either SomeException r -> Async r
completedAsync = toAsync . CompletedAsync
successfulAsync :: r -> Async r
successfulAsync = completedAsync . Right
failedAsync :: SomeException -> Async r
failedAsync = completedAsync . Left
-- * AsyncIO
data AsyncIO r
= AsyncIOSuccess r
| AsyncIOFailure SomeException
| AsyncIOIO (IO r)
| AsyncIOAsync (Async r)
| AsyncIOPlumbing (IO (AsyncIO r))
instance Functor AsyncIO where
fmap fn (AsyncIOSuccess x) = AsyncIOSuccess (fn x)
fmap _ (AsyncIOFailure x) = AsyncIOFailure x
fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x)
fmap fn (AsyncIOAsync x) = bindAsync x (pure . fn)
fmap fn (AsyncIOPlumbing x) = AsyncIOPlumbing (fn <<$>> x)
instance Applicative AsyncIO where
pure = AsyncIOSuccess
(<*>) pf px = pf >>= \f -> f <$> px
liftA2 f px py = px >>= \x -> f x <$> py
instance Monad AsyncIO where
(>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b
(>>=) (AsyncIOSuccess x) fn = fn x
(>>=) (AsyncIOFailure x) _ = AsyncIOFailure x
(>>=) (AsyncIOIO x) fn = AsyncIOPlumbing $ either AsyncIOFailure fn <$> try x
(>>=) (AsyncIOAsync x) fn = bindAsync x fn
(>>=) (AsyncIOPlumbing x) fn = AsyncIOPlumbing $ (>>= fn) <$> x
instance MonadIO AsyncIO where
liftIO = AsyncIOIO
instance MonadThrow AsyncIO where
throwM = AsyncIOFailure . toException
instance MonadCatch AsyncIO where
catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a
catch x@(AsyncIOSuccess _) _ = x
catch x@(AsyncIOFailure ex) handler = maybe x handler (fromException ex)
catch (AsyncIOIO x) handler = AsyncIOIO (try x) >>= handleEither handler
catch (AsyncIOAsync x) handler = bindAsyncCatch x (handleEither handler)
catch (AsyncIOPlumbing x) handler = AsyncIOPlumbing $ x >>= pure . (`catch` handler)
handleEither :: Exception e => (e -> AsyncIO a) -> Either SomeException a -> AsyncIO a
handleEither handler (Left ex) = maybe (AsyncIOFailure ex) handler (fromException ex)
handleEither _ (Right r) = pure r
bindAsync :: forall a b. Async a -> (a -> AsyncIO b) -> AsyncIO b
bindAsync x fn = bindAsyncCatch x (either (AsyncIOFailure) fn)
bindAsyncCatch :: forall a b. Async a -> (Either SomeException a -> AsyncIO b) -> AsyncIO b
bindAsyncCatch x fn = AsyncIOPlumbing $ newAsyncVar >>= bindAsync'
where
bindAsync' resultVar = do
withResult x resultVar step
pure $ await resultVar
step :: (Either SomeException b -> IO ()) -> Either SomeException a -> IO ()
step put = putAsyncIOResult put . fn
withResult :: Async a -> AsyncVar b -> ((Either SomeException b -> IO ()) -> Either SomeException a -> IO ()) -> IO ()
withResult x var fn = onResult_ x (failAsyncVar var) (fn (putAsyncVarEither var))
putAsyncIOResult :: (Either SomeException a -> IO ()) -> AsyncIO a -> IO ()
putAsyncIOResult put (AsyncIOSuccess x) = put (Right x)
putAsyncIOResult put (AsyncIOFailure x) = put (Left x)
putAsyncIOResult put (AsyncIOIO x) = try x >>= put
putAsyncIOResult put (AsyncIOAsync x) = onResult_ x (put . Left) put
putAsyncIOResult put (AsyncIOPlumbing x) = x >>= putAsyncIOResult put
-- | Run the synchronous part of an `AsyncIO` and then return an `Async` that can be used to wait for completion of the synchronous part.
async :: AsyncIO r -> AsyncIO (Async r)
async = fmap successfulAsync
await :: IsAsync r a => a -> AsyncIO r
await = AsyncIOAsync . toAsync
-- | Run an `AsyncIO` to completion and return the result.
runAsyncIO :: AsyncIO r -> IO r
runAsyncIO (AsyncIOSuccess x) = pure x
runAsyncIO (AsyncIOFailure x) = throwIO x
runAsyncIO (AsyncIOIO x) = x
runAsyncIO (AsyncIOAsync x) = wait x
runAsyncIO (AsyncIOPlumbing x) = x >>= runAsyncIO -- TODO error handling
awaitResult :: AsyncIO (Async r) -> AsyncIO r
awaitResult = (await =<<)
mapAsync :: (a -> b) -> Async a -> AsyncIO (Async b)
-- FIXME: don't actually attach a function if the resulting async is not used
-- maybe use `Weak`? When `Async b` is GC'ed, the handler is detached from `Async a`
mapAsync fn = async . fmap fn . await
-- ** Forking asyncs
-- TODO
--class IsAsyncForkable m where
-- asyncThread :: m r -> AsyncIO r
-- * Async helpers
-- ** AsyncVar
-- | The default implementation for a `Async` that can be fulfilled later.
newtype AsyncVar r = AsyncVar (MVar (AsyncVarState r))
data AsyncVarState r = AsyncVarCompleted (Either SomeException r) | AsyncVarOpen (HM.HashMap Unique (Either SomeException r -> IO (), SomeException -> IO ()))
instance IsAsync r (AsyncVar r) where
peekAsync :: AsyncVar r -> IO (Maybe (Either SomeException r))
peekAsync (AsyncVar mvar) = readMVar mvar >>= pure . \case
AsyncVarCompleted x -> Just x
AsyncVarOpen _ -> Nothing
onResult :: AsyncVar r -> (SomeException -> IO ()) -> (Either SomeException r -> IO ()) -> IO Disposable
onResult (AsyncVar mvar) callbackExceptionHandler callback =
modifyMVar mvar $ \case
AsyncVarOpen callbacks -> do
key <- newUnique
pure (AsyncVarOpen (HM.insert key (callback, callbackExceptionHandler) callbacks), removeHandler key)
x@(AsyncVarCompleted value) -> (x, noDisposable) <$ callback value
where
removeHandler :: Unique -> Disposable
removeHandler key = synchronousDisposable $ modifyMVar_ mvar $ pure . \case
x@(AsyncVarCompleted _) -> x
AsyncVarOpen x -> AsyncVarOpen $ HM.delete key x
newAsyncVar :: MonadIO m => m (AsyncVar r)
newAsyncVar = liftIO $ AsyncVar <$> newMVar (AsyncVarOpen HM.empty)
putAsyncVar :: MonadIO m => AsyncVar a -> a -> m ()
putAsyncVar var = putAsyncVarEither var . Right
failAsyncVar :: MonadIO m => AsyncVar a -> SomeException -> m ()
failAsyncVar var = putAsyncVarEither var . Left
putAsyncVarEither :: MonadIO m => AsyncVar a -> Either SomeException a -> m ()
putAsyncVarEither (AsyncVar mvar) value = liftIO $ do
mask $ \restore -> do
takeMVar mvar >>= \case
x@(AsyncVarCompleted _) -> do
putMVar mvar x
fail "An AsyncVar can only be fulfilled once"
AsyncVarOpen callbacksMap -> do
let callbacks = HM.elems callbacksMap
-- NOTE disposing a callback while it is called is a deadlock
forM_ callbacks $ \(callback, callbackExceptionHandler) ->
restore (callback value) `catch` callbackExceptionHandler
putMVar mvar (AsyncVarCompleted value)
-- * Disposable
class IsDisposable a where
-- TODO document laws: must not throw exceptions, is idempotent
-- | Dispose a resource.
dispose :: a -> AsyncIO ()
toDisposable :: a -> Disposable
toDisposable = mkDisposable . dispose
-- | Dispose a resource in the IO monad.
disposeIO :: IsDisposable a => a -> IO ()
disposeIO = runAsyncIO . dispose
instance IsDisposable a => IsDisposable (Maybe a) where
dispose = mapM_ dispose
newtype Disposable = Disposable (AsyncIO ())
instance IsDisposable Disposable where
dispose (Disposable fn) = fn
toDisposable = id
instance Semigroup Disposable where
x <> y = mkDisposable $ liftA2 (<>) (dispose x) (dispose y)
instance Monoid Disposable where
mempty = mkDisposable $ pure ()
mconcat disposables = mkDisposable $ traverse_ dispose disposables
mkDisposable :: AsyncIO () -> Disposable
mkDisposable = Disposable
synchronousDisposable :: IO () -> Disposable
synchronousDisposable = mkDisposable . liftIO
noDisposable :: Disposable
noDisposable = mempty