Newer
Older
module Quasar.ResourceManager (
-- * MonadResourceManager
MonadResourceManager(..),
registerNewResource,
registerDisposable,
registerDisposeAction,
registerSimpleDisposeAction,
disposeEventually,
withSubResourceManagerM,
onResourceManager,
captureDisposable,
captureTask,
-- ** ResourceManager
IsResourceManager(..),
ResourceManager,
newResourceManager,
attachDisposeAction,
attachDisposeAction_,
combinedExceptions,
-- ** Linking computations to a resource manager
linkExecution,
CancelLinkedExecution,
-- ** Resource manager implementations
newUnmanagedRootResourceManager,
--newUnmanagedDefaultResourceManager,
import Control.Concurrent (ThreadId, forkIOWithUnmask, myThreadId, throwTo)
import Control.Concurrent.STM
import Control.Monad.Catch
import Control.Monad.Reader
import Data.Foldable (toList)
import Data.List.NonEmpty (NonEmpty(..), nonEmpty)
import Data.Sequence (Seq(..), (|>))
import Data.Sequence qualified as Seq
import Quasar.Awaitable
import Quasar.Disposable
import Quasar.Prelude
data FailedToRegisterResource = FailedToRegisterResource
deriving stock (Eq, Show)
instance Exception FailedToRegisterResource where
displayException FailedToRegisterResource =
"FailedToRegisterResource: Failed to register a resource to a resource manager. This might result in leaked resources if left unhandled."
-- | Internal entry of `ResourceManager`. The `TMVar` will be set to `Nothing` when the disposable has completed disposing.
newtype ResourceManagerEntry = ResourceManagerEntry (TMVar (Awaitable (), Disposable))
instance IsAwaitable () ResourceManagerEntry where
toAwaitable (ResourceManagerEntry var) = do
awaitAny2
do
-- Wait for empty TMVar (dispose completed by resource manager)
unsafeAwaitSTM do
check . isNothing =<< tryReadTMVar var
do
-- Wait for Awaitable (dispose completed externally)
varContents <- unsafeAwaitSTM $ tryReadTMVar var
case varContents of
Nothing -> pure ()
Just (awaitable, _) -> awaitable
newEntry :: IsDisposable a => a -> IO ResourceManagerEntry
newEntry disposable = do
disposedAwaitable <- cacheAwaitable (isDisposed disposable)
ResourceManagerEntry <$> newTMVarIO (disposedAwaitable, toDisposable disposable)
checkEntries :: Seq ResourceManagerEntry -> IO ()
checkEntries = mapM_ checkEntry
checkEntry :: ResourceManagerEntry -> IO ()
checkEntry (ResourceManagerEntry var) = do
atomically (tryReadTMVar var) >>= \case
Nothing -> pure ()
Just (awaitable, _) -> do
completed <- isJust <$> peekAwaitable awaitable
when completed $ atomically $ void $ tryTakeTMVar var
entryIsEmpty :: ResourceManagerEntry -> STM Bool
entryIsEmpty (ResourceManagerEntry var) = isEmptyTMVar var
class IsDisposable a => IsResourceManager a where
toResourceManager :: a -> ResourceManager
-- | Attaches an `Disposable` to a ResourceManager. It will automatically be disposed when the resource manager is disposed.
attachDisposable :: (IsDisposable b, MonadIO m) => a -> b -> m ()
attachDisposable self = attachDisposable (toResourceManager self)
--subResourceManager :: MonadResourceManager m => m (DisposableResourceThingy)
-- | Forward an exception that happened asynchronously.
throwToResourceManager :: Exception e => a -> e -> IO ()
throwToResourceManager = throwToResourceManager . toResourceManager
{-# MINIMAL toResourceManager | (attachDisposable, throwToResourceManager) #-}
data ResourceManager = forall a. IsResourceManager a => ResourceManager a
instance IsResourceManager ResourceManager where
toResourceManager = id
attachDisposable (ResourceManager x) = attachDisposable x
throwToResourceManager (ResourceManager x) = throwToResourceManager x
instance IsDisposable ResourceManager where
toDisposable (ResourceManager x) = toDisposable x
class (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager m where
-- | Get the underlying resource manager.
askResourceManager :: m ResourceManager
-- | Replace the resource manager for a computation.
localResourceManager :: IsResourceManager a => a -> m r -> m r
registerDisposable :: (IsDisposable a, MonadResourceManager m) => a -> m ()
registerDisposable disposable = do
resourceManager <- askResourceManager
attachDisposable resourceManager disposable
registerDisposeAction :: MonadResourceManager m => IO (Awaitable ()) -> m ()
registerDisposeAction disposeAction = mask_ $ registerDisposable =<< newDisposable disposeAction
registerSimpleDisposeAction :: MonadResourceManager m => IO () -> m ()
registerSimpleDisposeAction disposeAction = registerDisposeAction (pure () <$ disposeAction)
registerNewResource :: (IsDisposable a, MonadResourceManager m) => m a -> m a
registerNewResource action = mask_ do
afix \awaitable -> do
registerDisposeAction $ either (\(_ :: SomeException) -> mempty) dispose =<< try (await awaitable)
action
-- TODO rename to withResourceScope?
withSubResourceManagerM :: MonadResourceManager m => m a -> m a
withSubResourceManagerM action =
bracket newResourceManager (await <=< dispose) \scope -> localResourceManager scope action
instance (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager (ReaderT ResourceManager m) where
localResourceManager resourceManager = local (const (toResourceManager resourceManager))
askResourceManager = ask
instance {-# OVERLAPPABLE #-} MonadResourceManager m => MonadResourceManager (ReaderT r m) where
askResourceManager = lift askResourceManager
localResourceManager resourceManager action = do
x <- ask
lift $ localResourceManager resourceManager $ runReaderT action x
-- TODO MonadResourceManager instances for StateT, WriterT, RWST, MaybeT, ...
onResourceManager :: (IsResourceManager a, MonadIO m) => a -> ReaderT ResourceManager IO r -> m r
onResourceManager target action = liftIO $ runReaderT action (toResourceManager target)
captureDisposable :: MonadResourceManager m => m a -> m (a, Disposable)
captureDisposable action = do
-- TODO improve performance by only creating a new resource manager when two or more disposables are attached
resourceManager <- newResourceManager
result <- localResourceManager resourceManager action
pure $ (result, toDisposable resourceManager)
captureDisposable_ :: MonadResourceManager m => m () -> m Disposable
captureDisposable_ = snd <<$>> captureDisposable
captureTask :: MonadResourceManager m => m (Awaitable a) -> m (Task a)
captureTask action = do
(awaitable, disposable) <- captureDisposable action
pure $ Task disposable awaitable
-- | A computation bound to a resource manager with 'linkThread' should be canceled.
data CancelLinkedExecution = CancelLinkedExecution Unique
deriving anyclass Exception
instance Show CancelLinkedExecution where
show _ = "CancelLinkedExecution"
data LinkState = LinkStateLinked ThreadId | LinkStateThrowing | LinkStateCompleted
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
deriving stock Eq
-- | Links the execution of a computation to a resource manager.
--
-- The computation is executed on the current thread. When the resource manager is disposed before the computation
-- is completed, a `CancelLinkedExecution`-exception is thrown to the current thread.
linkExecution :: MonadResourceManager m => m a -> m (Maybe a)
linkExecution action = do
key <- liftIO $ newUnique
var <- liftIO $ newTVarIO =<< LinkStateLinked <$> myThreadId
registerSimpleDisposeAction $ do
atomically (swapTVar var LinkStateThrowing) >>= \case
LinkStateLinked threadId -> throwTo threadId $ CancelLinkedExecution key
LinkStateThrowing -> pure () -- Dispose called twice
LinkStateCompleted -> pure () -- Thread has already left link
catch
do
result <- action
state <- liftIO $ atomically $ swapTVar var LinkStateCompleted
when (state == LinkStateThrowing) $ sleepForever -- Wait for exception to arrive
pure $ Just result
\ex@(CancelLinkedExecution exceptionKey) ->
if key == exceptionKey
then return Nothing
else throwM ex
newtype CombinedException = CombinedException (NonEmpty SomeException)
deriving stock Show
instance Exception CombinedException where
displayException (CombinedException exceptions) = intercalate "\n" (header : exceptionMessages)
where
header = mconcat ["CombinedException with ", show (length exceptions), "exceptions:"]
exceptionMessages = (displayException <$> toList exceptions)
combinedExceptions :: CombinedException -> [SomeException]
combinedExceptions (CombinedException exceptions) = toList exceptions
= RootResourceManager ResourceManager (TVar Bool) (TVar (Maybe (Seq SomeException))) (Awaitable ())
instance IsResourceManager RootResourceManager where
attachDisposable (RootResourceManager child _ _ _) disposable = attachDisposable child disposable
throwToResourceManager (RootResourceManager _ disposingVar exceptionsVar _) ex = do
-- TODO only log exceptions after a timeout
traceIO $ "Exception thrown to root resource manager: " <> displayException ex
liftIO $ join $ atomically do
stateTVar exceptionsVar \case
Just exceptions -> (pure (), Just (exceptions |> toException ex))
Nothing -> (fail @IO "Could not throw to resource manager: RootResourceManager is already disposed", Nothing)
instance IsDisposable RootResourceManager where
dispose (RootResourceManager _ disposingVar _ isDisposedAwaitable) = liftIO do
isDisposedAwaitable <$ atomically do
disposing <- readTVar disposingVar
unless disposing $ writeTVar disposingVar True
isDisposed (RootResourceManager _ _ _ isDisposedAwaitable) = isDisposedAwaitable
newUnmanagedRootResourceManager :: MonadIO m => m ResourceManager
newUnmanagedRootResourceManager = liftIO $ toResourceManager <$> do
disposingVar <- newTVarIO False
exceptionsVar <- newTVarIO (Just Empty)
mfix \root -> do
isDisposedAwaitable <- toAwaitable <$> unmanagedFork (disposeThread root)
child <- newUnmanagedDefaultResourceManager (toResourceManager root)
pure $ RootResourceManager child disposingVar exceptionsVar isDisposedAwaitable
where
disposeThread :: RootResourceManager -> IO ()
disposeThread (RootResourceManager child disposingVar exceptionsVar _) = do
-- Wait until disposing
disposing <- readTVar disposingVar
hasExceptions <- (> 0) . Seq.length <$> (maybe impossibleCodePathM pure =<< readTVar exceptionsVar)
check $ disposing || hasExceptions
-- TODO start the thread that reports exceptions (or a potential hang) after a timeout
exceptions <- atomically do
-- The var is set to `Nothing` to signal that no more exceptions can be received
maybe impossibleCodePathM pure =<< swapTVar exceptionsVar Nothing
-- If there are any exceptions will be stored in the awaitable (isDisposedAwaitable) by throwing them here
mapM_ (throwM . CombinedException) $ nonEmpty $ toList exceptions
withRootResourceManager :: (MonadAwait m, MonadMask m, MonadIO m) => ReaderT ResourceManager IO a -> m a
withRootResourceManager action =
bracket
newUnmanagedRootResourceManager
(`onResourceManager` action)
-- ** Default resource manager
data DefaultResourceManager = DefaultResourceManager {
parentResourceManager :: ResourceManager,
disposingVar :: TVar Bool,
disposedVar :: TVar Bool,
entriesVar :: TVar (Seq ResourceManagerEntry)
}
instance IsResourceManager DefaultResourceManager where
throwToResourceManager DefaultResourceManager{parentResourceManager} = throwToResourceManager parentResourceManager
attachDisposable resourceManager disposable = liftIO $ mask_ do
entry <- newEntry disposable
join $ atomically do
disposing <- readTVar (disposingVar resourceManager)
unless disposing $ modifyTVar (entriesVar resourceManager) (|> entry)
pure do
-- IO that is run after the STM transaction is completed
when disposing $
throwM FailedToRegisterResource `catchAll` throwToResourceManager resourceManager
instance IsDisposable DefaultResourceManager where
dispose resourceManager = liftIO $ mask_ do
entries <- atomically do
isAlreadyDisposing <- swapTVar (disposingVar resourceManager) True
if not isAlreadyDisposing
then readTVar (entriesVar resourceManager)
else pure Empty
mapM_ entryStartDispose entries
entryStartDispose :: ResourceManagerEntry -> IO ()
entryStartDispose (ResourceManagerEntry var) =
atomically (tryReadTMVar var) >>= \case
Nothing -> pure ()
Just (_, disposable) ->
catchAll
do void (dispose disposable)
\ex -> do
-- Disposable failed so it should be removed
atomically (void $ tryTakeTMVar var)
-- This will only throw if the parent is disposed, which would be an illegal state
-- TODO wrap in a 'DisposeException'
throwToResourceManager resourceManager ex
isDisposed resourceManager =
unsafeAwaitSTM do
disposed <- readTVar (disposedVar resourceManager)
unless disposed retry
newResourceManager :: MonadResourceManager m => m ResourceManager
newResourceManager = mask_ do
parent <- askResourceManager
-- TODO: return efficent resource manager
resourceManager <- newUnmanagedDefaultResourceManager parent
registerDisposable resourceManager
pure resourceManager
newUnmanagedDefaultResourceManager :: MonadIO m => ResourceManager -> m ResourceManager
newUnmanagedDefaultResourceManager parentResourceManager = liftIO do
disposingVar <- newTVarIO False
disposedVar <- newTVarIO False
entriesVar <- newTVarIO Empty
let resourceManager = DefaultResourceManager {
parentResourceManager,
disposingVar,
disposedVar,
entriesVar
}
void $ mask_ $ forkIOWithUnmask \unmask ->
unmask (freeGarbage resourceManager)
`catchAll`
\ex -> throwToResourceManager resourceManager (userError ("freeGarbage failed for DefaultResourceManager: " <> displayException ex))
freeGarbage :: DefaultResourceManager -> IO ()
freeGarbage resourceManager = go
where
go :: IO ()
go = do
snapshot <- atomically $ readTVar entriesVar'
let listChanged = unsafeAwaitSTM do
newLength <- Seq.length <$> readTVar entriesVar'
when (newLength == Seq.length snapshot) retry
isDisposing = unsafeAwaitSTM do
disposing <- readTVar (disposingVar resourceManager)
unless disposing retry
-- Wait for any entry to complete or until a new entry is added
let awaitables = (toAwaitable <$> toList snapshot)
-- GC fails here when an waitable throws an exception
void if Quasar.Prelude.null awaitables
then awaitAny2 listChanged isDisposing
else awaitAny (listChanged :| awaitables)
-- Checking entries for completion has to be done in IO.
-- Completion is queried with `entryIsEmpty` during the following STM transaction for legacy reasons (the resource
-- manager once did allow to add resources while disposing). This could be simplified now.
checkEntries =<< atomically (readTVar entriesVar')
join $ atomically $ do
disposing <- readTVar (disposingVar resourceManager)
-- Filter completed entries
allEntries <- readTVar entriesVar'
filteredEntries <- foldM (\acc entry -> entryIsEmpty entry >>= \isEmpty -> pure if isEmpty then acc else acc |> entry) Empty allEntries
writeTVar entriesVar' filteredEntries
if disposing && Seq.null filteredEntries
then do
writeTVar (disposedVar resourceManager) True
pure $ pure ()
else pure go
entriesVar' :: TVar (Seq ResourceManagerEntry)
entriesVar' = entriesVar resourceManager
-- | Creates an `Disposable` that is bound to a ResourceManager. It will automatically be disposed when the resource manager is disposed.
attachDisposeAction :: MonadIO m => ResourceManager -> IO (Awaitable ()) -> m Disposable
attachDisposeAction resourceManager action = liftIO $ mask_ $ do
disposable <- newDisposable action
attachDisposable resourceManager disposable
pure disposable
-- | Attaches a dispose action to a ResourceManager. It will automatically be run when the resource manager is disposed.
attachDisposeAction_ :: MonadIO m => ResourceManager -> IO (Awaitable ()) -> m ()
attachDisposeAction_ resourceManager action = void $ attachDisposeAction resourceManager action
-- | Start disposing a resource but instead of waiting for the operation to complete, pass the responsibility to a
-- `MonadResourceManager`.
--
-- The synchronous part of the `dispose`-Function will be run immediately but the resulting `Awaitable` will be passed
-- to the resource manager.
disposeEventually :: (IsDisposable a, MonadResourceManager m) => a -> m ()
disposeEventually disposable = do
disposeCompleted <- dispose disposable
peekAwaitable disposeCompleted >>= \case
Just () -> pure ()
Nothing -> registerDisposable disposable