module Quasar.ResourceManager ( -- * MonadResourceManager MonadResourceManager(..), ResourceManagerT, ResourceManagerIO, ResourceManagerSTM, FailedToRegisterResource, attachDisposable, registerNewResource, registerNewResource_, registerDisposable, registerDisposeAction, registerAsyncDisposeAction, throwToResourceManager, withScopedResourceManager, onResourceManager, onResourceManagerSTM, captureDisposable, captureDisposable_, disposeOnError, liftResourceManagerIO, runInResourceManagerSTM, enterResourceManager, enterResourceManagerSTM, newUniqueRM, -- ** Top level initialization withRootResourceManager, -- ** ResourceManager IsResourceManager(..), ResourceManager, newResourceManager, newResourceManagerSTM, attachDisposeAction, attachDisposeAction_, -- ** Linking computations to a resource manager linkExecution, CancelLinkedExecution, -- * Reexports CombinedException, combinedExceptions, ) where import Control.Concurrent (ThreadId, forkIO, myThreadId, throwTo, threadDelay) import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Reader import Data.Foldable (toList) import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM import Data.List.NonEmpty ((<|), nonEmpty) import Data.Sequence (Seq(..), (|>)) import Data.Sequence qualified as Seq import Quasar.Awaitable import Quasar.Disposable import Quasar.Exceptions import Quasar.Old.UnmanagedAsync import Quasar.Prelude import Quasar.Utils.Exceptions data FailedToRegisterResource = FailedToRegisterResource deriving stock (Eq, Show) instance Exception FailedToRegisterResource where displayException FailedToRegisterResource = "FailedToRegisterResource: Failed to register a resource to a resource manager. This might result in leaked resources if left unhandled." data FailedToLockResourceManager = FailedToLockResourceManager deriving stock (Eq, Show) instance Exception FailedToLockResourceManager where displayException FailedToLockResourceManager = "FailedToLockResourceManager: Failed to lock a resource manager." -- TODO HasResourceManager, getResourceManager class IsDisposable a => IsResourceManager a where toResourceManager :: a -> ResourceManager class MonadFix m => MonadResourceManager m where -- | Get the underlying resource manager. askResourceManager :: m ResourceManager -- | Replace the resource manager for a computation. localResourceManager :: IsResourceManager a => a -> m r -> m r -- | Locks the resource manager. As long as the resource manager is locked, it's possible to register new resources -- on the resource manager. -- -- This prevents the resource manager from disposing, so the computation must not block for an unbound amount of time. lockResourceManager :: MonadResourceManager m => m a -> m a -- | Run an `STM` computation. Depending on the monad this may be run in a dedicated STM transaction or may be -- embedded in a larger transaction. runInSTM :: MonadResourceManager m => STM a -> m a maskIfRequired :: MonadResourceManager m => m a -> m a -- | Forward an exception that happened asynchronously. throwToResourceManager :: (Exception e, MonadResourceManager m) => e -> m () throwToResourceManager ex = do resourceManager <- askResourceManager runInSTM $ throwToResourceManagerImpl resourceManager (toException ex) runInResourceManagerSTM :: MonadResourceManager m => ResourceManagerSTM a -> m a runInResourceManagerSTM action = do resourceManager <- askResourceManager runInSTM $ runReaderT action resourceManager -- | Register a `Disposable` to the resource manager. -- -- May throw an `FailedToRegisterResource` if the resource manager is disposing/disposed. registerDisposable :: (IsDisposable a, MonadResourceManager m) => a -> m () registerDisposable disposable = do resourceManager <- askResourceManager runInSTM $ attachDisposable resourceManager disposable registerDisposeAction :: MonadResourceManager m => IO () -> m () registerDisposeAction disposeAction = runInResourceManagerSTM do disposable <- lift (newDisposable disposeAction) registerDisposable disposable registerAsyncDisposeAction :: MonadResourceManager m => IO () -> m () registerAsyncDisposeAction disposeAction = runInResourceManagerSTM do disposable <- lift (newAsyncDisposable disposeAction) registerDisposable disposable -- | Locks the resource manager (which may fail), runs the computation and registeres the resulting disposable. -- -- The computation will be run in masked state (if not running atomically in `STM`). -- -- The computation must not block for an unbound amount of time. registerNewResource :: (IsDisposable a, MonadResourceManager m) => m a -> m a registerNewResource action = maskIfRequired $ lockResourceManager do resource <- action registerDisposable resource pure resource registerNewResource_ :: (IsDisposable a, MonadResourceManager m) => m a -> m () registerNewResource_ action = void $ registerNewResource action withScopedResourceManager :: (MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m a withScopedResourceManager action = bracket newResourceManager dispose \scope -> localResourceManager scope action type ResourceManagerT = ReaderT ResourceManager type ResourceManagerIO = ResourceManagerT IO type ResourceManagerSTM = ResourceManagerT STM instance (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager (ResourceManagerT m) where localResourceManager resourceManager = local (const (toResourceManager resourceManager)) askResourceManager = ask lockResourceManager action = do resourceManager <- askResourceManager lockResourceManagerImpl resourceManager action runInSTM action = liftIO $ atomically action maskIfRequired = mask_ -- Overlaps the ResourceManagerT/MonadIO-instance, because `MonadIO` _could_ be specified for `STM` (but that would be -- _very_ incorrect, so this is safe). instance {-# OVERLAPS #-} MonadResourceManager (ResourceManagerT STM) where localResourceManager resourceManager = local (const (toResourceManager resourceManager)) askResourceManager = ask -- | No-op, since STM is always executed atomically. lockResourceManager = id runInSTM action = lift action maskIfRequired = id instance {-# OVERLAPPABLE #-} MonadResourceManager m => MonadResourceManager (ReaderT r m) where askResourceManager = lift askResourceManager localResourceManager resourceManager action = do x <- ask lift $ localResourceManager resourceManager $ runReaderT action x lockResourceManager action = do x <- ask lift $ lockResourceManager $ runReaderT action x runInSTM action = lift $ runInSTM action maskIfRequired action = do x <- ask lift $ maskIfRequired $ runReaderT action x -- TODO MonadResourceManager instances for StateT, WriterT, RWST, MaybeT, ... onResourceManager :: (IsResourceManager a, MonadIO m) => a -> ResourceManagerIO r -> m r onResourceManager target action = liftIO $ runReaderT action (toResourceManager target) onResourceManagerSTM :: (IsResourceManager a) => a -> ResourceManagerSTM r -> STM r onResourceManagerSTM target action = runReaderT action (toResourceManager target) liftResourceManagerIO :: (MonadResourceManager m, MonadIO m) => ResourceManagerIO r -> m r liftResourceManagerIO action = do resourceManager <- askResourceManager onResourceManager resourceManager action captureDisposable :: MonadResourceManager m => m a -> m (a, Disposable) captureDisposable action = do resourceManager <- newResourceManager result <- localResourceManager resourceManager action pure (result, toDisposable resourceManager) captureDisposable_ :: MonadResourceManager m => m () -> m Disposable captureDisposable_ = snd <<$>> captureDisposable -- | Disposes all resources created by the computation if the computation throws an exception. disposeOnError :: (MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m a disposeOnError action = do bracketOnError newResourceManager dispose \resourceManager -> localResourceManager resourceManager action -- | Run a computation on a resource manager and throw any exception that occurs to the resource manager. -- -- This can be used to run e.g. callbacks that belong to a different resource context. -- -- Locks the resource manager, so the computation must not block for an unbounded time. -- -- May throw an exception when the resource manager is disposing. enterResourceManager :: MonadIO m => ResourceManager -> ResourceManagerIO () -> m () enterResourceManager resourceManager action = liftIO do onResourceManager resourceManager $ lockResourceManager do action `catchAll` \ex -> throwToResourceManager ex -- | Run a computation on a resource manager and throw any exception that occurs to the resource manager. -- -- This can be used to run e.g. callbacks that belong to a different resource context. enterResourceManagerSTM :: ResourceManager -> ResourceManagerSTM () -> STM () enterResourceManagerSTM resourceManager action = do onResourceManagerSTM resourceManager do action `catchAll` \ex -> throwToResourceManager ex -- | Create a new `Unique` in a `MonadResourceManager` monad. newUniqueRM :: MonadResourceManager m => m Unique newUniqueRM = runInSTM newUniqueSTM -- * Resource manager implementations -- ** Root resource manager data ResourceManager = NormalResourceManager ResourceManagerCore ResourceManager | RootResourceManager ResourceManagerCore (TVar Bool) (TMVar (Seq SomeException)) (AsyncVar [SomeException]) instance IsResourceManager ResourceManager where toResourceManager = id resourceManagerCore :: ResourceManager -> ResourceManagerCore resourceManagerCore (RootResourceManager core _ _ _) = core resourceManagerCore (NormalResourceManager core _) = core throwToResourceManagerImpl :: ResourceManager -> SomeException -> STM () throwToResourceManagerImpl (NormalResourceManager _ exceptionManager) ex = throwToResourceManagerImpl exceptionManager ex throwToResourceManagerImpl (RootResourceManager _ _ exceptionsVar _) ex = do tryTakeTMVar exceptionsVar >>= \case Just exceptions -> do putTMVar exceptionsVar (exceptions |> toException ex) Nothing -> do throwM $ userError "Could not throw to resource manager: RootResourceManager is already disposed" instance IsDisposable ResourceManager where beginDispose (NormalResourceManager core _) = beginDispose core beginDispose (RootResourceManager internal disposingVar _ _) = do defaultResourceManagerDisposeResult internal <$ atomically do disposing <- readTVar disposingVar unless disposing $ writeTVar disposingVar True isDisposed (resourceManagerCore -> core) = isDisposed core registerFinalizer (resourceManagerCore -> core) = registerFinalizer core newUnmanagedRootResourceManagerInternal :: MonadIO m => m ResourceManager newUnmanagedRootResourceManagerInternal = liftIO do disposingVar <- newTVarIO False exceptionsVar <- newTMVarIO Empty finalExceptionsVar <- newAsyncVar mfix \root -> do void $ forkIO (disposeWorker root) internal <- atomically $ newUnmanagedDefaultResourceManagerInternal (throwToResourceManagerImpl root) pure $ RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar where disposeWorker :: ResourceManager -> IO () disposeWorker (NormalResourceManager _ _) = unreachableCodePathM disposeWorker (RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar) = handleAll do \ex -> fail $ "RootResourceManager thread failed unexpectedly: " <> displayException ex do -- Wait until disposing atomically do disposing <- readTVar disposingVar hasExceptions <- not . Seq.null <$> readTMVar exceptionsVar check $ disposing || hasExceptions -- Start a thread to report exceptions (or a potential hang) after a timeout reportThread <- unmanagedAsync reportTimeout -- Dispose resources dispose internal atomically do -- The var is set to `Nothing` to signal that no more exceptions can be received exceptions <- takeTMVar exceptionsVar putAsyncVarSTM_ finalExceptionsVar $ toList exceptions -- Clean up timeout/report thread dispose reportThread where timeoutSeconds :: Int timeoutSeconds = 5 timeoutMicroseconds :: Int timeoutMicroseconds = timeoutSeconds * 1_000_000 reportTimeout :: IO () reportTimeout = do threadDelay timeoutMicroseconds atomically (tryReadTMVar exceptionsVar) >>= \case Nothing -> pure () -- Terminate Just Empty -> do traceIO $ mconcat ["Root resource manager did not dispose within ", show timeoutSeconds, " seconds"] reportExceptions 0 Empty Just exs -> do traceIO $ mconcat [ "Root resource manager did not dispose within ", show timeoutSeconds, " seconds (", show (length exs), " exception(s) queued)" ] reportExceptions 0 exs reportExceptions :: Int -> Seq SomeException -> IO () reportExceptions alreadyReported Empty = join $ atomically do Seq.drop alreadyReported <<$>> tryReadTMVar exceptionsVar >>= \case Nothing -> pure $ pure () -- Terminate Just Empty -> retry Just exs -> pure $ reportExceptions alreadyReported exs reportExceptions alreadyReported (ex :<| exs) = do traceIO $ "Exception thrown to blocked root resource manager: " <> displayException ex reportExceptions (alreadyReported + 1) exs withRootResourceManager :: MonadIO m => ResourceManagerIO a -> m a withRootResourceManager action = liftIO $ uninterruptibleMask \unmask -> do resourceManager@(RootResourceManager _ _ _ finalExceptionsVar) <- newUnmanagedRootResourceManagerInternal result <- try $ unmask $ onResourceManager resourceManager action disposeEventually_ resourceManager exceptions <- await finalExceptionsVar case result of Left (ex :: SomeException) -> maybe (throwM ex) (throwM . CombinedException . (ex <|)) (nonEmpty exceptions) Right result' -> maybe (pure result') (throwM . CombinedException) $ nonEmpty exceptions -- ** Default resource manager data ResourceManagerCore = ResourceManagerCore { resourceManagerKey :: Unique, throwToHandler :: SomeException -> STM (), stateVar :: TVar ResourceManagerState, disposablesVar :: TMVar (HashMap Unique Disposable), lockVar :: TVar Word64, resultVar :: AsyncVar (Awaitable [ResourceManagerResult]), finalizers :: DisposableFinalizers } data ResourceManagerState = ResourceManagerNormal | ResourceManagerDisposing | ResourceManagerDisposed -- | Attaches an `Disposable` to a ResourceManager. It will automatically be disposed when the resource manager is disposed. -- -- May throw an `FailedToRegisterResource` if the resource manager is disposing/disposed. attachDisposable :: (IsResourceManager a, IsDisposable b) => a -> b -> STM () attachDisposable rm disposable = attachDisposableImpl (resourceManagerCore (toResourceManager rm)) (toDisposable disposable) attachDisposableImpl :: ResourceManagerCore -> Disposable -> STM () attachDisposableImpl ResourceManagerCore{stateVar, disposablesVar} disposable = do key <- newUniqueSTM state <- readTVar stateVar case state of ResourceManagerNormal -> do disposables <- takeTMVar disposablesVar putTMVar disposablesVar (HM.insert key disposable disposables) void $ registerFinalizer disposable (finalizer key) _ -> throwM FailedToRegisterResource where finalizer :: Unique -> STM () finalizer key = tryTakeTMVar disposablesVar >>= \case Just disposables -> putTMVar disposablesVar $ HM.delete key disposables Nothing -> pure () lockResourceManagerImpl :: (MonadIO m, MonadMask m) => ResourceManager -> m b -> m b lockResourceManagerImpl (resourceManagerCore -> ResourceManagerCore{stateVar, lockVar}) = bracket_ (liftIO aquire) (liftIO release) where aquire :: IO () aquire = atomically do readTVar stateVar >>= \case ResourceManagerNormal -> pure () _ -> throwM FailedToLockResourceManager modifyTVar lockVar (+ 1) release :: IO () release = atomically (modifyTVar lockVar (\x -> x - 1)) instance IsDisposable ResourceManagerCore where beginDispose self@ResourceManagerCore{resourceManagerKey, throwToHandler, stateVar, disposablesVar, lockVar, resultVar, finalizers} = liftIO do uninterruptibleMask_ do join $ atomically do state <- readTVar stateVar case state of ResourceManagerNormal -> do writeTVar stateVar ResourceManagerDisposing readTVar lockVar >>= \case 0 -> do disposables <- takeDisposables pure (primaryBeginDispose disposables) _ -> pure primaryForkDisposeThread ResourceManagerDisposing -> pure $ pure $ defaultResourceManagerDisposeResult self ResourceManagerDisposed -> pure $ pure DisposeResultDisposed where primaryForkDisposeThread :: IO DisposeResult primaryForkDisposeThread = forkDisposeThread do disposables <- atomically do check =<< (== 0) <$> readTVar lockVar takeDisposables void $ primaryBeginDispose disposables -- Only one thread enters this function (in uninterruptible masked state) primaryBeginDispose :: [Disposable] -> IO DisposeResult primaryBeginDispose disposables = do (reportExceptionActions, resultAwaitables) <- unzip <$> mapM beginDisposeEntry disposables -- TODO cache was removed, has to be re-optimized later let cachedResultAwaitable = mconcat resultAwaitables putAsyncVar_ resultVar cachedResultAwaitable let isCompletedAwaitable :: Awaitable () isCompletedAwaitable = awaitResourceManagerResult $ ResourceManagerResult resourceManagerKey cachedResultAwaitable alreadyCompleted <- isJust <$> peekAwaitable isCompletedAwaitable if alreadyCompleted then do completeDisposing pure DisposeResultDisposed else do -- Start thread to collect exceptions, await completion and run finalizers forkDisposeThread do -- Collect exceptions from directly attached disposables sequence_ reportExceptionActions -- Await completion attached resource managers await isCompletedAwaitable completeDisposing forkDisposeThread :: IO () -> IO DisposeResult forkDisposeThread action = do defaultResourceManagerDisposeResult self <$ forkIO do catchAll action \ex -> atomically $ throwToHandler $ toException $ userError ("Dispose thread failed for ResourceManager: " <> displayException ex) takeDisposables :: STM [Disposable] takeDisposables = toList <$> takeTMVar disposablesVar beginDisposeEntry :: Disposable -> IO (IO (), (Awaitable [ResourceManagerResult])) beginDisposeEntry disposable = catchAll do result <- beginDispose disposable pure case result of DisposeResultDisposed -> (pure (), pure []) -- Moves error reporting from the awaitable to the finalizer thread DisposeResultAwait awaitable -> (processDisposeException awaitable, [] <$ awaitSuccessOrFailure awaitable) DisposeResultResourceManager resourceManagerResult -> (pure (), pure [resourceManagerResult]) \ex -> do atomically $ throwToHandler $ toException $ DisposeException ex pure (pure (), pure []) processDisposeException :: Awaitable () -> IO () processDisposeException awaitable = await awaitable `catchAll` \ex -> atomically $ throwToHandler $ toException $ DisposeException ex completeDisposing :: IO () completeDisposing = atomically do writeTVar stateVar $ ResourceManagerDisposed defaultRunFinalizers finalizers isDisposed ResourceManagerCore{stateVar} = unsafeAwaitSTM do disposed <- stateIsDisposed <$> readTVar stateVar check disposed where stateIsDisposed :: ResourceManagerState -> Bool stateIsDisposed ResourceManagerDisposed = True stateIsDisposed _ = False registerFinalizer ResourceManagerCore{finalizers} = defaultRegisterFinalizer finalizers defaultResourceManagerDisposeResult :: ResourceManagerCore -> DisposeResult defaultResourceManagerDisposeResult ResourceManagerCore{resourceManagerKey, resultVar} = DisposeResultResourceManager $ ResourceManagerResult resourceManagerKey $ join $ toAwaitable resultVar -- | Internal constructor. The resulting resource manager core is indirectly attached to it's parent by it's exception -- handler. newUnmanagedDefaultResourceManagerInternal :: (SomeException -> STM ()) -> STM ResourceManagerCore newUnmanagedDefaultResourceManagerInternal throwToHandler = do resourceManagerKey <- newUniqueSTM stateVar <- newTVar ResourceManagerNormal disposablesVar <- newTMVar HM.empty lockVar <- newTVar 0 finalizers <- newDisposableFinalizersSTM resultVar <- newAsyncVarSTM pure ResourceManagerCore { resourceManagerKey, throwToHandler, stateVar, disposablesVar, lockVar, finalizers, resultVar } newResourceManager :: MonadResourceManager m => m ResourceManager newResourceManager = do parent <- askResourceManager runInSTM $ newResourceManagerSTM parent newResourceManagerSTM :: ResourceManager -> STM ResourceManager newResourceManagerSTM parent = do -- Bind core exception handler to parent to tie exception handling to the parent resourceManager <- newUnmanagedDefaultResourceManagerInternal (throwToResourceManagerImpl parent) -- Attach disposable to parent to tie resource management to the parent attachDisposable parent resourceManager pure $ NormalResourceManager resourceManager parent -- * Utilities -- | Creates an `Disposable` that is bound to a ResourceManager. It will automatically be disposed when the resource manager is disposed. attachDisposeAction :: ResourceManager -> IO () -> STM Disposable attachDisposeAction resourceManager action = do disposable <- newDisposable action attachDisposable resourceManager disposable pure disposable -- | Attaches a dispose action to a ResourceManager. It will automatically be run when the resource manager is disposed. attachDisposeAction_ :: ResourceManager -> IO () -> STM () attachDisposeAction_ resourceManager action = void $ attachDisposeAction resourceManager action -- ** Link execution to resource manager -- | A computation bound to a resource manager with 'linkThread' should be canceled. data CancelLinkedExecution = CancelLinkedExecution Unique deriving anyclass Exception instance Show CancelLinkedExecution where show _ = "CancelLinkedExecution" data LinkState = LinkStateLinked ThreadId | LinkStateThrowing | LinkStateCompleted deriving stock Eq -- | Links the execution of a computation to a resource manager. -- -- The computation is executed on the current thread. When the resource manager is disposed before the computation -- is completed, a `CancelLinkedExecution`-exception is thrown to the current thread. linkExecution :: (MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m (Maybe a) linkExecution action = do key <- liftIO $ newUnique var <- liftIO $ newTVarIO =<< LinkStateLinked <$> myThreadId registerDisposeAction $ do atomically (swapTVar var LinkStateThrowing) >>= \case LinkStateLinked threadId -> throwTo threadId $ CancelLinkedExecution key LinkStateThrowing -> pure () -- Dispose called twice LinkStateCompleted -> pure () -- Thread has already left link catch do result <- action state <- liftIO $ atomically $ swapTVar var LinkStateCompleted when (state == LinkStateThrowing) $ sleepForever -- Wait for exception to arrive pure $ Just result \ex@(CancelLinkedExecution exceptionKey) -> if key == exceptionKey then return Nothing else throwM ex