From a2c0091d089e9940b60dbf14b83bb0cdb02ef901 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sat, 12 Feb 2022 02:53:51 +0100 Subject: [PATCH] Add ShortIO newtype to track IO invariant for TIOWorker --- quasar.cabal | 1 + src/Quasar/Async/STMHelper.hs | 13 +++++----- src/Quasar/Async/V2.hs | 8 +++--- src/Quasar/Resources/Disposer.hs | 34 ++++++++++++------------ src/Quasar/Utils/ShortIO.hs | 44 ++++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+), 26 deletions(-) create mode 100644 src/Quasar/Utils/ShortIO.hs diff --git a/quasar.cabal b/quasar.cabal index dc5df0c..8e62dac 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -110,6 +110,7 @@ library Quasar.Utils.TOnce other-modules: Quasar.Resources.Disposer + Quasar.Utils.ShortIO hs-source-dirs: src diff --git a/src/Quasar/Async/STMHelper.hs b/src/Quasar/Async/STMHelper.hs index 623379e..6e38644 100644 --- a/src/Quasar/Async/STMHelper.hs +++ b/src/Quasar/Async/STMHelper.hs @@ -9,19 +9,20 @@ module Quasar.Async.STMHelper ( forkWithUnmask_, ) where -import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask) +import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent.STM import Control.Exception (BlockedIndefinitelyOnSTM) import Control.Monad.Catch import Quasar.Awaitable import Quasar.Exceptions import Quasar.Prelude +import Quasar.Utils.ShortIO newtype TIOWorker = TIOWorker (TQueue (IO ())) -startShortIO :: forall a. IO a -> TIOWorker -> ExceptionChannel -> STM (Awaitable a) +startShortIO :: forall a. ShortIO a -> TIOWorker -> ExceptionChannel -> STM (Awaitable a) startShortIO fn (TIOWorker jobQueue) exChan = do resultVar <- newAsyncVarSTM writeTQueue jobQueue $ job resultVar @@ -29,13 +30,13 @@ startShortIO fn (TIOWorker jobQueue) exChan = do where job :: AsyncVar a -> IO () job resultVar = do - try fn >>= \case + try (runShortIO fn) >>= \case Left ex -> do atomically $ throwToExceptionChannel exChan ex failAsyncVar_ resultVar $ toException $ AsyncException ex Right result -> putAsyncVar_ resultVar result -startShortIO_ :: forall a. IO a -> TIOWorker -> ExceptionChannel -> STM () +startShortIO_ :: forall a. ShortIO a -> TIOWorker -> ExceptionChannel -> STM () startShortIO_ x y z = void $ startShortIO x y z @@ -61,8 +62,8 @@ fork_ fn worker exChan = void $ fork fn worker exChan forkWithUnmask :: ((forall a. IO a -> IO a) -> IO ()) -> TIOWorker -> ExceptionChannel -> STM (Awaitable ThreadId) forkWithUnmask fn worker exChan = startShortIO forkFn worker exChan where - forkFn :: IO ThreadId - forkFn = mask_ $ forkIOWithUnmask wrappedFn + forkFn :: ShortIO ThreadId + forkFn = mask_ $ forkIOWithUnmaskShortIO wrappedFn wrappedFn :: (forall a. IO a -> IO a) -> IO () wrappedFn unmask = fn unmask `catchAll` \ex -> atomically (throwToExceptionChannel exChan ex) diff --git a/src/Quasar/Async/V2.hs b/src/Quasar/Async/V2.hs index 1f8a906..f1ba3d8 100644 --- a/src/Quasar/Async/V2.hs +++ b/src/Quasar/Async/V2.hs @@ -17,7 +17,6 @@ module Quasar.Async.V2 ( import Control.Concurrent (ThreadId) import Control.Concurrent.STM -import Control.Exception (throwTo) import Control.Monad.Catch import Quasar.Async.STMHelper import Quasar.Awaitable @@ -25,6 +24,7 @@ import Quasar.Exceptions import Quasar.Monad import Quasar.Prelude import Quasar.Resources.Disposer +import Quasar.Utils.ShortIO import Control.Monad.Reader @@ -64,12 +64,12 @@ unmanagedAsyncWithUnmask fn worker exChan = do Right retVal -> do putAsyncVar_ resultVar retVal atomically $ disposeEventuallySTM_ disposer - disposeFn :: Unique -> AsyncVar a -> Awaitable ThreadId -> IO (Awaitable ()) + disposeFn :: Unique -> AsyncVar a -> Awaitable ThreadId -> ShortIO (Awaitable ()) disposeFn key resultVar tidAwaitable = do -- Awaits forking of the thread, which should happen immediately (as long as the TIOWorker-invariant isn't broken elsewhere) - tid <- await tidAwaitable + tid <- unsafeShortIO $ await tidAwaitable -- `throwTo` should also happen immediately, as long as `uninterruptibleMask` isn't abused elsewhere - throwTo tid (CancelAsync key) + throwToShortIO tid (CancelAsync key) -- Considered complete once a result (i.e. success or failure) has been stored pure (() <$ toAwaitable resultVar) diff --git a/src/Quasar/Resources/Disposer.hs b/src/Quasar/Resources/Disposer.hs index dee1b4a..5817d1e 100644 --- a/src/Quasar/Resources/Disposer.hs +++ b/src/Quasar/Resources/Disposer.hs @@ -17,7 +17,6 @@ module Quasar.Resources.Disposer ( ) where -import Control.Concurrent (forkIO) import Control.Concurrent.STM import Control.Monad (foldM) import Control.Monad.Catch @@ -30,6 +29,7 @@ import Quasar.Async.STMHelper import Quasar.Awaitable import Quasar.Exceptions import Quasar.Prelude +import Quasar.Utils.ShortIO import Quasar.Utils.TOnce @@ -46,11 +46,10 @@ data Disposer instance Resource Disposer where getDisposer = id -type DisposeFn = IO (Awaitable ()) +type DisposeFn = ShortIO (Awaitable ()) --- TODO document: IO has to be "short" -newPrimitiveDisposer :: IO (Awaitable ()) -> TIOWorker -> ExceptionChannel -> STM Disposer +newPrimitiveDisposer :: ShortIO (Awaitable ()) -> TIOWorker -> ExceptionChannel -> STM Disposer newPrimitiveDisposer fn worker exChan = do key <- newUniqueSTM FnDisposer key worker exChan <$> newTOnce fn <*> newFinalizers @@ -95,18 +94,18 @@ beginDisposeFnDisposer worker exChan disposeState finalizers = startShortIO_ (runDisposeFn awaitableVar disposeFn) worker exChan pure $ join (toAwaitable awaitableVar) - runDisposeFn :: AsyncVar (Awaitable ()) -> DisposeFn -> IO () + runDisposeFn :: AsyncVar (Awaitable ()) -> DisposeFn -> ShortIO () runDisposeFn awaitableVar disposeFn = mask_ $ handleAll exceptionHandler do awaitable <- disposeFn - putAsyncVar_ awaitableVar awaitable + putAsyncVarShortIO_ awaitableVar awaitable runFinalizersAfter finalizers awaitable where - exceptionHandler :: SomeException -> IO () + -- In case of an exception mark disposable as completed to prevent resource managers from being stuck indefinitely + exceptionHandler :: SomeException -> ShortIO () exceptionHandler ex = do - -- In case of an exception mark disposable as completed to prevent resource managers from being stuck indefinitely - putAsyncVar_ awaitableVar (pure ()) - atomically $ runFinalizers finalizers - throwIO $ DisposeException ex + putAsyncVarShortIO_ awaitableVar (pure ()) + runFinalizersShortIO finalizers + throwM $ DisposeException ex disposerKey :: Disposer -> Unique disposerKey (FnDisposer key _ _ _ _) = key @@ -193,7 +192,7 @@ beginDisposeResourceManagerInternal rm = do dependenciesVar <- newAsyncVarSTM writeTVar (resourceManagerState rm) (ResourceManagerDisposing (toAwaitable dependenciesVar)) attachedDisposers <- HM.elems <$> readTVar attachedResources - startShortIO_ (void $ forkIO (disposeThread dependenciesVar attachedDisposers)) worker exChan + startShortIO_ (void $ forkIOShortIO (disposeThread dependenciesVar attachedDisposers)) worker exChan pure $ DisposeDependencies rmKey (toAwaitable dependenciesVar) ResourceManagerDisposing deps -> pure $ DisposeDependencies rmKey deps ResourceManagerDisposed -> pure $ DisposeDependencies rmKey mempty @@ -274,14 +273,17 @@ runFinalizers (Finalizers finalizerVar) = do Just finalizers -> sequence_ finalizers Nothing -> throwM $ userError "runFinalizers was called multiple times (it must only be run once)" -runFinalizersAfter :: Finalizers -> Awaitable () -> IO () +runFinalizersShortIO :: Finalizers -> ShortIO () +runFinalizersShortIO finalizers = unsafeShortIO $ atomically $ runFinalizers finalizers + +runFinalizersAfter :: Finalizers -> Awaitable () -> ShortIO () runFinalizersAfter finalizers awaitable = do -- Peek awaitable to ensure trivial disposables always run without forking - isCompleted <- isJust <$> peekAwaitable awaitable + isCompleted <- isJust <$> peekAwaitableShortIO awaitable if isCompleted then - atomically $ runFinalizers finalizers + runFinalizersShortIO finalizers else - void $ forkIO do + void $ forkIOShortIO do await awaitable atomically $ runFinalizers finalizers diff --git a/src/Quasar/Utils/ShortIO.hs b/src/Quasar/Utils/ShortIO.hs new file mode 100644 index 0000000..c8837e9 --- /dev/null +++ b/src/Quasar/Utils/ShortIO.hs @@ -0,0 +1,44 @@ +module Quasar.Utils.ShortIO ( + ShortIO, + runShortIO, + unsafeShortIO, + + forkIOShortIO, + forkIOWithUnmaskShortIO, + throwToShortIO, + + -- ** Some specific functions required internally + peekAwaitableShortIO, + putAsyncVarShortIO_, +) where + +import Control.Monad.Catch +import Quasar.Awaitable +import Quasar.Prelude +import Control.Concurrent + +newtype ShortIO a = ShortIO (IO a) + deriving newtype (Functor, Applicative, Monad, MonadThrow, MonadCatch, MonadMask) + +runShortIO :: ShortIO a -> IO a +runShortIO (ShortIO fn) = fn + +unsafeShortIO :: IO a -> ShortIO a +unsafeShortIO = ShortIO + + +forkIOShortIO :: IO () -> ShortIO ThreadId +forkIOShortIO fn = ShortIO $ forkIO fn + +forkIOWithUnmaskShortIO :: ((forall a. IO a -> IO a) -> IO ()) -> ShortIO ThreadId +forkIOWithUnmaskShortIO fn = ShortIO $ forkIOWithUnmask fn + +throwToShortIO :: Exception e => ThreadId -> e -> ShortIO () +throwToShortIO tid ex = ShortIO $ throwTo tid ex + + +peekAwaitableShortIO :: Awaitable r -> ShortIO (Maybe r) +peekAwaitableShortIO awaitable = ShortIO $ peekAwaitable awaitable + +putAsyncVarShortIO_ :: AsyncVar a -> a -> ShortIO () +putAsyncVarShortIO_ var value = ShortIO $ putAsyncVar_ var value -- GitLab