diff --git a/src/Quasar/Async/V2.hs b/src/Quasar/Async/V2.hs index 5b4adfc2e1cc266dd8a66e8be6ce28a434ec8bc0..f0a34d38b5dd4d53edff37fdcbee84fc2f79ecec 100644 --- a/src/Quasar/Async/V2.hs +++ b/src/Quasar/Async/V2.hs @@ -89,7 +89,7 @@ asyncWithUnmask fn = do worker <- askIOWorker exChan <- askExceptionChannel rm <- askResourceManager - runSTM do + ensureSTM do as <- unmanagedAsyncWithUnmaskSTM (\unmask -> runReaderT (fn (liftUnmask unmask)) quasar) worker exChan attachResource rm as pure as diff --git a/src/Quasar/Monad.hs b/src/Quasar/Monad.hs index 589b663392ba2a37d7ff911adf699d4124fdc98d..c2123937ba8f0d55178b412e4b3df444fd6b43c5 100644 --- a/src/Quasar/Monad.hs +++ b/src/Quasar/Monad.hs @@ -11,8 +11,9 @@ module Quasar.Monad ( QuasarIO, QuasarSTM, + runQuasarIO, liftQuasarIO, - runQuasarSTM, + quasarAtomically, ) where import Control.Concurrent.STM @@ -23,6 +24,8 @@ import Quasar.Async.STMHelper import Quasar.Exceptions import Quasar.Prelude import Quasar.Resources.Disposer +import Quasar.Awaitable +import Quasar.Utils.ShortIO -- Invariant: the resource manager is disposed as soon as an exception is thrown to the channel @@ -49,9 +52,9 @@ quasarExceptionChannel (Quasar _ exChan _) = exChan quasarResourceManager :: Quasar -> ResourceManager quasarResourceManager (Quasar _ _ rm) = rm -newQuasar :: TIOWorker -> ExceptionChannel -> ResourceManager -> STM Quasar -newQuasar worker parentExChan parentRM = do - rm <- newResourceManagerSTM worker parentExChan +newQuasarSTM :: TIOWorker -> ExceptionChannel -> ResourceManager -> STM Quasar +newQuasarSTM worker parentExChan parentRM = do + rm <- newUnmanagedResourceManagerSTM worker parentExChan attachResource parentRM rm pure $ Quasar worker (ExceptionChannel (disposeOnException rm)) rm where @@ -60,36 +63,69 @@ newQuasar worker parentExChan parentRM = do disposeEventuallySTM_ rm throwToExceptionChannel parentExChan ex +newQuasar :: MonadQuasar m => m Quasar +newQuasar = do + worker <- askIOWorker + exChan <- askExceptionChannel + parentRM <- askResourceManager + ensureSTM $ newQuasarSTM worker exChan parentRM + class (MonadCatch m, MonadFix m) => MonadQuasar m where askQuasar :: m Quasar - runSTM :: STM a -> m a maskIfRequired :: m a -> m a + startShortIO :: ShortIO a -> m (Awaitable a) + ensureSTM :: STM a -> m a + ensureQuasarSTM :: QuasarSTM a -> m a type QuasarT = ReaderT Quasar type QuasarIO = QuasarT IO -type QuasarSTM = QuasarT STM + +newtype QuasarSTM a = QuasarSTM (ReaderT (Quasar, TVar (Awaitable ())) STM a) + deriving newtype (Functor, Applicative, Monad, MonadThrow, MonadCatch, MonadFix, Alternative) instance (MonadIO m, MonadMask m, MonadFix m) => MonadQuasar (QuasarT m) where askQuasar = ask - runSTM t = liftIO (atomically t) + ensureSTM t = liftIO (atomically t) maskIfRequired = mask_ - --- Overlaps the QuasartT/MonadIO-instance, because `MonadIO` _could_ be specified for `STM` (but that would be _very_ incorrect, so this is safe). -instance {-# OVERLAPS #-} MonadQuasar (QuasarT STM) where - askQuasar = ask - runSTM = lift + startShortIO fn = do + exChan <- askExceptionChannel + liftIO $ try (runShortIO fn) >>= \case + Left ex -> do + atomically $ throwToExceptionChannel exChan ex + pure $ throwM $ toException $ AsyncException ex + Right result -> pure $ pure result + ensureQuasarSTM = quasarAtomically + + +instance MonadQuasar QuasarSTM where + askQuasar = QuasarSTM (asks fst) + ensureSTM fn = QuasarSTM (lift fn) maskIfRequired = id + startShortIO fn = do + (quasar, effectAwaitableVar) <- QuasarSTM ask + let + worker = quasarIOWorker quasar + exChan = quasarExceptionChannel quasar + + ensureSTM do + awaitable <- startShortIOSTM fn worker exChan + -- Await in reverse order, so it is almost guaranteed this only retries once + modifyTVar effectAwaitableVar (awaitSuccessOrFailure awaitable *>) + pure awaitable + ensureQuasarSTM = id -- Overlappable so a QuasarT has priority over the base monad. instance {-# OVERLAPPABLE #-} MonadQuasar m => MonadQuasar (ReaderT r m) where askQuasar = lift askQuasar - runSTM t = lift (runSTM t) + ensureSTM t = lift (ensureSTM t) maskIfRequired fn = do x <- ask lift $ maskIfRequired (runReaderT fn x) + startShortIO t = lift (startShortIO t) + ensureQuasarSTM t = lift (ensureQuasarSTM t) -- TODO MonadQuasar instances for StateT, WriterT, RWST, MaybeT, ... @@ -109,7 +145,14 @@ liftQuasarIO fn = do quasar <- askQuasar liftIO $ runReaderT fn quasar -runQuasarSTM :: MonadQuasar m => QuasarSTM a -> m a -runQuasarSTM fn = do - quasar <- askQuasar - runSTM $ runReaderT fn quasar +runQuasarIO :: MonadIO m => Quasar -> QuasarIO a -> m a +runQuasarIO quasar fn = liftIO $ runReaderT fn quasar + +quasarAtomically :: (MonadQuasar m, MonadIO m) => QuasarSTM a -> m a +quasarAtomically (QuasarSTM fn) = do + quasar <- askQuasar + liftIO do + await =<< atomically do + effectAwaitableVar <- newTVar (pure ()) + result <- runReaderT fn (quasar, effectAwaitableVar) + (result <$) <$> readTVar effectAwaitableVar diff --git a/src/Quasar/Resources.hs b/src/Quasar/Resources.hs index 342de39b76dcb0da3f203279e3ef6d3a8f068f2c..888ebc5fd7231467bfa1cb2492a9f71036ed742a 100644 --- a/src/Quasar/Resources.hs +++ b/src/Quasar/Resources.hs @@ -25,7 +25,7 @@ module Quasar.Resources ( -- ** Resource manager ResourceManager, - newResourceManagerSTM, + newUnmanagedResourceManagerSTM, attachResource, ) where @@ -57,26 +57,26 @@ newSTMDisposer fn worker exChan = newPrimitiveDisposer disposeFn worker exChan registerResource :: (Resource a, MonadQuasar m) => a -> m () registerResource resource = do rm <- askResourceManager - runSTM $ attachResource rm resource + ensureSTM $ attachResource rm resource registerDisposeAction :: MonadQuasar m => IO () -> m () registerDisposeAction fn = do worker <- askIOWorker exChan <- askExceptionChannel rm <- askResourceManager - runSTM $ attachResource rm =<< newIODisposer fn worker exChan + ensureSTM $ attachResource rm =<< newIODisposer fn worker exChan registerDisposeTransaction :: MonadQuasar m => STM () -> m () registerDisposeTransaction fn = do worker <- askIOWorker exChan <- askExceptionChannel rm <- askResourceManager - runSTM $ attachResource rm =<< newSTMDisposer fn worker exChan + ensureSTM $ attachResource rm =<< newSTMDisposer fn worker exChan registerNewResource :: forall a m. (Resource a, MonadQuasar m) => m a -> m a registerNewResource fn = do rm <- askResourceManager - disposing <- isJust <$> runSTM (peekAwaitableSTM (isDisposing rm)) + disposing <- isJust <$> ensureSTM (peekAwaitableSTM (isDisposing rm)) -- Bail out before creating the resource _if possible_ when disposing $ throwM AlreadyDisposing @@ -92,7 +92,7 @@ registerNewResource fn = do disposeEventually :: (Resource r, MonadQuasar m) => r -> m (Awaitable ()) -disposeEventually res = runSTM $ disposeEventuallySTM res +disposeEventually res = ensureSTM $ disposeEventuallySTM res disposeEventually_ :: (Resource r, MonadQuasar m) => r -> m () -disposeEventually_ res = runSTM $ disposeEventuallySTM_ res +disposeEventually_ res = ensureSTM $ disposeEventuallySTM_ res diff --git a/src/Quasar/Resources/Disposer.hs b/src/Quasar/Resources/Disposer.hs index e2016510c64f858d1f350cdcdd3fe4d32f4727d9..0db63b06d8bc0d4c8987cda459ad9a4b32090b58 100644 --- a/src/Quasar/Resources/Disposer.hs +++ b/src/Quasar/Resources/Disposer.hs @@ -12,7 +12,7 @@ module Quasar.Resources.Disposer ( -- * Resource manager ResourceManager, - newResourceManagerSTM, + newUnmanagedResourceManagerSTM, attachResource, ) where @@ -142,8 +142,8 @@ instance Resource ResourceManager where getDisposer = ResourceManagerDisposer -newResourceManagerSTM :: TIOWorker -> ExceptionChannel -> STM ResourceManager -newResourceManagerSTM worker exChan = do +newUnmanagedResourceManagerSTM :: TIOWorker -> ExceptionChannel -> STM ResourceManager +newUnmanagedResourceManagerSTM worker exChan = do resourceManagerKey <- newUniqueSTM attachedResources <- newTVar mempty resourceManagerState <- newTVar (ResourceManagerNormal attachedResources worker exChan)