Skip to content
Snippets Groups Projects
ResourceManager.hs 17.3 KiB
Newer Older
module Quasar.ResourceManager (
  -- * MonadResourceManager
  MonadResourceManager(..),
  FailedToRegisterResource,
  registerDisposable,
  registerDisposeAction,
  registerSimpleDisposeAction,
  disposeEventually,
  withSubResourceManagerM,
  onResourceManager,
  captureDisposable,
  captureDisposable_,
  captureTask,

  -- ** ResourceManager
  IsResourceManager(..),
  ResourceManager,
  newResourceManager,
  attachDisposeAction,
  attachDisposeAction_,

  -- ** Initialization
  CombinedException,
  withRootResourceManager,

  -- ** Linking computations to a resource manager
  linkExecution,
  CancelLinkedExecution,
  -- ** Resource manager implementations
  newUnmanagedRootResourceManager,
  --newUnmanagedDefaultResourceManager,
import Control.Concurrent (ThreadId, forkIOWithUnmask, myThreadId, throwTo)
import Control.Concurrent.STM
import Control.Monad.Catch
import Control.Monad.Reader
import Data.Foldable (toList)
import Data.List.NonEmpty (NonEmpty(..), nonEmpty)
import Data.Sequence (Seq(..), (|>))
import Data.Sequence qualified as Seq
import Quasar.Awaitable
import Quasar.Disposable
import Quasar.Prelude
import Quasar.Utils.Concurrent
data FailedToRegisterResource = FailedToRegisterResource
  deriving stock (Eq, Show)

instance Exception FailedToRegisterResource where
  displayException FailedToRegisterResource =
    "FailedToRegisterResource: Failed to register a resource to a resource manager. This might result in leaked resources if left unhandled."

-- | Internal entry of `ResourceManager`. The `TMVar` will be set to `Nothing` when the disposable has completed disposing.
newtype ResourceManagerEntry = ResourceManagerEntry (TMVar (Awaitable (), Disposable))

instance IsAwaitable () ResourceManagerEntry where
  toAwaitable (ResourceManagerEntry var) = do
    awaitAny2
      do
        -- Wait for empty TMVar (dispose completed by resource manager)
        unsafeAwaitSTM do
          check . isNothing =<< tryReadTMVar var
      do
        -- Wait for Awaitable (dispose completed externally)
        varContents <- unsafeAwaitSTM $ tryReadTMVar var
        case varContents of
          Nothing -> pure ()
          Just (awaitable, _) -> awaitable


newEntry :: IsDisposable a => a -> IO ResourceManagerEntry
newEntry disposable = do
  disposedAwaitable <- cacheAwaitable (isDisposed disposable)
  ResourceManagerEntry <$> newTMVarIO (disposedAwaitable, toDisposable disposable)

checkEntries :: Seq ResourceManagerEntry -> IO ()
checkEntries = mapM_ checkEntry

checkEntry :: ResourceManagerEntry -> IO ()
checkEntry (ResourceManagerEntry var) = do
  atomically (tryReadTMVar var) >>= \case
    Nothing -> pure ()
    Just (awaitable, _) -> do
      completed <- isJust <$> peekAwaitable awaitable
      when completed $ atomically $ void $ tryTakeTMVar var

entryIsEmpty :: ResourceManagerEntry -> STM Bool
entryIsEmpty (ResourceManagerEntry var) = isEmptyTMVar var


class IsDisposable a => IsResourceManager a where
  toResourceManager :: a -> ResourceManager
  toResourceManager = ResourceManager
  -- | Attaches an `Disposable` to a ResourceManager. It will automatically be disposed when the resource manager is disposed.
  attachDisposable :: (IsDisposable b, MonadIO m) => a -> b -> m ()
  attachDisposable self = attachDisposable (toResourceManager self)

  --subResourceManager :: MonadResourceManager m => m (DisposableResourceThingy)

  -- | Forward an exception that happened asynchronously.
  throwToResourceManager :: Exception e => a -> e -> IO ()
  throwToResourceManager = throwToResourceManager . toResourceManager

  {-# MINIMAL toResourceManager | (attachDisposable, throwToResourceManager) #-}

data ResourceManager = forall a. IsResourceManager a => ResourceManager a
instance IsResourceManager ResourceManager where
  toResourceManager = id
  attachDisposable (ResourceManager x) = attachDisposable x
  throwToResourceManager (ResourceManager x) = throwToResourceManager x
instance IsDisposable ResourceManager where
  toDisposable (ResourceManager x) = toDisposable x
class (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager m where
  -- | Get the underlying resource manager.
  askResourceManager :: m ResourceManager

  -- | Replace the resource manager for a computation.
  localResourceManager :: IsResourceManager a => a -> m r -> m r


registerDisposable :: (IsDisposable a, MonadResourceManager m) => a -> m ()
registerDisposable disposable = do
  resourceManager <- askResourceManager
  attachDisposable resourceManager disposable


registerDisposeAction :: MonadResourceManager m => IO (Awaitable ()) -> m ()
registerDisposeAction disposeAction = mask_ $ registerDisposable =<< newDisposable disposeAction

registerSimpleDisposeAction :: MonadResourceManager m => IO () -> m ()
registerSimpleDisposeAction disposeAction = registerDisposeAction (pure () <$ disposeAction)
registerNewResource :: (IsDisposable a, MonadResourceManager m) => m a -> m a
registerNewResource action = mask_ do
  afix \awaitable -> do
    registerDisposeAction $ either (\(_ :: SomeException) -> mempty) dispose =<< try (await awaitable)
    action

-- TODO rename to withResourceScope?
withSubResourceManagerM :: MonadResourceManager m => m a -> m a
withSubResourceManagerM action =
  bracket newResourceManager (await <=< dispose) \scope -> localResourceManager scope action


instance (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager (ReaderT ResourceManager m) where
  localResourceManager resourceManager = local (const (toResourceManager resourceManager))

  askResourceManager = ask


instance {-# OVERLAPPABLE #-} MonadResourceManager m => MonadResourceManager (ReaderT r m) where
  askResourceManager = lift askResourceManager

  localResourceManager resourceManager action = do
    x <- ask
    lift $ localResourceManager resourceManager $ runReaderT action x


-- TODO MonadResourceManager instances for StateT, WriterT, RWST, MaybeT, ...


onResourceManager :: (IsResourceManager a, MonadIO m) => a -> ReaderT ResourceManager IO r -> m r
onResourceManager target action = liftIO $ runReaderT action (toResourceManager target)
captureDisposable :: MonadResourceManager m => m a -> m (a, Disposable)
captureDisposable action = do
  -- TODO improve performance by only creating a new resource manager when two or more disposables are attached
  resourceManager <- newResourceManager
  result <- localResourceManager resourceManager action
  pure $ (result, toDisposable resourceManager)

captureDisposable_ :: MonadResourceManager m => m () -> m Disposable
captureDisposable_ = snd <<$>> captureDisposable

captureTask :: MonadResourceManager m => m (Awaitable a) -> m (Task a)
captureTask action = do
  (awaitable, disposable) <- captureDisposable action
  pure $ Task disposable awaitable
-- | A computation bound to a resource manager with 'linkThread' should be canceled.
data CancelLinkedExecution = CancelLinkedExecution Unique
  deriving anyclass Exception
instance Show CancelLinkedExecution where
  show _ = "CancelLinkedExecution"
data LinkState = LinkStateLinked ThreadId | LinkStateThrowing | LinkStateCompleted
  deriving stock Eq


-- | Links the execution of a computation to a resource manager.
--
-- The computation is executed on the current thread. When the resource manager is disposed before the computation
-- is completed, a `CancelLinkedExecution`-exception is thrown to the current thread.
linkExecution :: MonadResourceManager m => m a -> m (Maybe a)
linkExecution action = do
  key <- liftIO $ newUnique
  var <- liftIO $ newTVarIO =<< LinkStateLinked <$> myThreadId
  registerSimpleDisposeAction $ do
    atomically (swapTVar var LinkStateThrowing) >>= \case
      LinkStateLinked threadId -> throwTo threadId $ CancelLinkedExecution key
      LinkStateThrowing -> pure () -- Dispose called twice
      LinkStateCompleted -> pure () -- Thread has already left link

  catch
    do
      result <- action
      state <- liftIO $ atomically $ swapTVar var LinkStateCompleted
      when (state == LinkStateThrowing) $ sleepForever -- Wait for exception to arrive
      pure $ Just result

    \ex@(CancelLinkedExecution exceptionKey) ->
      if key == exceptionKey
        then return Nothing
        else throwM ex

-- * Resource manager implementations

-- ** Root resource manager
newtype CombinedException = CombinedException (NonEmpty SomeException)
  deriving stock Show

instance Exception CombinedException where
  displayException (CombinedException exceptions) = intercalate "\n" (header : exceptionMessages)
    where
      header = mconcat ["CombinedException with ", show (length exceptions), "exceptions:"]
      exceptionMessages = (displayException <$> toList exceptions)
combinedExceptions :: CombinedException -> [SomeException]
combinedExceptions (CombinedException exceptions) = toList exceptions


data RootResourceManager
  = RootResourceManager ResourceManager (TVar Bool) (TVar (Maybe (Seq SomeException))) (Awaitable ())

instance IsResourceManager RootResourceManager where
  attachDisposable (RootResourceManager child _ _ _) disposable = attachDisposable child disposable
  throwToResourceManager (RootResourceManager _ disposingVar exceptionsVar _) ex = do
    -- TODO only log exceptions after a timeout
    traceIO $ "Exception thrown to root resource manager: " <> displayException ex
    liftIO $ join $ atomically do
      stateTVar exceptionsVar \case
        Just exceptions -> (pure (), Just (exceptions |> toException ex))
        Nothing -> (fail @IO "Could not throw to resource manager: RootResourceManager is already disposed", Nothing)

instance IsDisposable RootResourceManager where
  dispose (RootResourceManager _ disposingVar _ isDisposedAwaitable) = liftIO do
    isDisposedAwaitable <$ atomically do
      disposing <- readTVar disposingVar
      unless disposing $ writeTVar disposingVar True
  isDisposed (RootResourceManager _ _ _ isDisposedAwaitable) = isDisposedAwaitable

newUnmanagedRootResourceManager :: MonadIO m => m ResourceManager
newUnmanagedRootResourceManager = liftIO $ toResourceManager <$> do
  disposingVar <- newTVarIO False
  exceptionsVar <- newTVarIO (Just Empty)
  mfix \root -> do
    isDisposedAwaitable <- toAwaitable <$> unmanagedFork (disposeThread root)
    child <- newUnmanagedDefaultResourceManager (toResourceManager root)
    pure $ RootResourceManager child disposingVar exceptionsVar isDisposedAwaitable

  where
    disposeThread :: RootResourceManager -> IO ()
    disposeThread (RootResourceManager child disposingVar exceptionsVar _) = do
      -- Wait until disposing
        disposing <- readTVar disposingVar
        hasExceptions <- (> 0) . Seq.length <$> (maybe impossibleCodePathM pure =<< readTVar exceptionsVar)
        check $ disposing || hasExceptions

      -- TODO start the thread that reports exceptions (or a potential hang) after a timeout

      await =<< dispose child
        -- The var is set to `Nothing` to signal that no more exceptions can be received
        maybe impossibleCodePathM pure =<< swapTVar exceptionsVar Nothing

      -- If there are any exceptions will be stored in the awaitable (isDisposedAwaitable) by throwing them here
      mapM_ (throwM . CombinedException) $ nonEmpty $ toList exceptions
withRootResourceManager :: (MonadAwait m, MonadMask m, MonadIO m) => ReaderT ResourceManager IO a -> m a
withRootResourceManager action =
  bracket
    newUnmanagedRootResourceManager
    (await <=< dispose)
    (`onResourceManager` action)
-- ** Default resource manager

data DefaultResourceManager = DefaultResourceManager {
  parentResourceManager :: ResourceManager,
  disposingVar :: TVar Bool,
  disposedVar :: TVar Bool,
  entriesVar :: TVar (Seq ResourceManagerEntry)
}

instance IsResourceManager DefaultResourceManager where
  throwToResourceManager DefaultResourceManager{parentResourceManager} = throwToResourceManager parentResourceManager

  attachDisposable resourceManager disposable = liftIO $ mask_ do
    entry <- newEntry disposable

    join $ atomically do
      disposing <- readTVar (disposingVar resourceManager)
      unless disposing $ modifyTVar (entriesVar resourceManager) (|> entry)
      pure do
        -- IO that is run after the STM transaction is completed
        when disposing $
          throwM FailedToRegisterResource `catchAll` throwToResourceManager resourceManager

instance IsDisposable DefaultResourceManager where
  dispose resourceManager = liftIO $ mask_ do
Jens Nolte's avatar
Jens Nolte committed
    entries <- atomically do
      isAlreadyDisposing <- swapTVar (disposingVar resourceManager) True
      if not isAlreadyDisposing
        then readTVar (entriesVar resourceManager)
        else pure Empty

Jens Nolte's avatar
Jens Nolte committed
    pure $ isDisposed resourceManager
      entryStartDispose :: ResourceManagerEntry -> IO ()
      entryStartDispose (ResourceManagerEntry var) =
Jens Nolte's avatar
Jens Nolte committed
        atomically (tryReadTMVar var) >>= \case
          Nothing -> pure ()
          Just (_, disposable) ->
            catchAll
              do void (dispose disposable)
              \ex -> do
                -- Disposable failed so it should be removed
                atomically (void $ tryTakeTMVar var)
                -- This will only throw if the parent is disposed, which would be an illegal state
                -- TODO wrap in a 'DisposeException'
                throwToResourceManager resourceManager ex


  isDisposed resourceManager =
    unsafeAwaitSTM do
      disposed <- readTVar (disposedVar resourceManager)
      unless disposed retry

newResourceManager :: MonadResourceManager m => m ResourceManager
newResourceManager = mask_ do
  parent <- askResourceManager
  -- TODO: return efficent resource manager
  resourceManager <- newUnmanagedDefaultResourceManager parent
  registerDisposable resourceManager
  pure resourceManager

newUnmanagedDefaultResourceManager :: MonadIO m => ResourceManager -> m ResourceManager
newUnmanagedDefaultResourceManager parentResourceManager = liftIO do
  disposingVar <- newTVarIO False
  disposedVar <- newTVarIO False
  entriesVar <- newTVarIO Empty

  let resourceManager = DefaultResourceManager {
    parentResourceManager,
    disposingVar,
    disposedVar,
    entriesVar
  }

  void $ mask_ $ forkIOWithUnmask \unmask ->
    unmask (freeGarbage resourceManager)
      `catchAll`
        \ex -> throwToResourceManager resourceManager (userError ("freeGarbage failed for DefaultResourceManager: " <> displayException ex))
  pure $ toResourceManager resourceManager
freeGarbage :: DefaultResourceManager -> IO ()
freeGarbage resourceManager = go
  where
    go :: IO ()
    go = do
      snapshot <- atomically $ readTVar entriesVar'

      let listChanged = unsafeAwaitSTM do
            newLength <- Seq.length <$> readTVar entriesVar'
            when (newLength == Seq.length snapshot) retry

          isDisposing = unsafeAwaitSTM do
            disposing <- readTVar (disposingVar resourceManager)
            unless disposing retry

      -- Wait for any entry to complete or until a new entry is added
      let awaitables = (toAwaitable <$> toList snapshot)
      -- GC fails here when an waitable throws an exception
      void if Quasar.Prelude.null awaitables
        then awaitAny2 listChanged isDisposing
        else awaitAny (listChanged :| awaitables)

      -- Checking entries for completion has to be done in IO.
      -- Completion is queried with `entryIsEmpty` during the following STM transaction for legacy reasons (the resource
      -- manager once did allow to add resources while disposing). This could be simplified now.
      checkEntries =<< atomically (readTVar entriesVar')

      join $ atomically $ do
        disposing <- readTVar (disposingVar resourceManager)

        -- Filter completed entries
        allEntries <- readTVar entriesVar'
        filteredEntries <- foldM (\acc entry -> entryIsEmpty entry >>= \isEmpty -> pure if isEmpty then acc else acc |> entry) Empty allEntries
        writeTVar entriesVar' filteredEntries

        if disposing && Seq.null filteredEntries
           then do
             writeTVar (disposedVar resourceManager) True
             pure $ pure ()
           else pure go

    entriesVar' :: TVar (Seq ResourceManagerEntry)
    entriesVar' = entriesVar resourceManager


-- | Creates an `Disposable` that is bound to a ResourceManager. It will automatically be disposed when the resource manager is disposed.
attachDisposeAction :: MonadIO m => ResourceManager -> IO (Awaitable ()) -> m Disposable
attachDisposeAction resourceManager action = liftIO $ mask_ $ do
  disposable <- newDisposable action
  attachDisposable resourceManager disposable
  pure disposable

-- | Attaches a dispose action to a ResourceManager. It will automatically be run when the resource manager is disposed.
attachDisposeAction_ :: MonadIO m => ResourceManager -> IO (Awaitable ()) -> m ()
attachDisposeAction_ resourceManager action = void $ attachDisposeAction resourceManager action

-- | Start disposing a resource but instead of waiting for the operation to complete, pass the responsibility to a
-- `MonadResourceManager`.
--
-- The synchronous part of the `dispose`-Function will be run immediately but the resulting `Awaitable` will be passed
-- to the resource manager.
disposeEventually :: (IsDisposable a, MonadResourceManager m) => a -> m ()
disposeEventually disposable = do
  disposeCompleted <- dispose disposable
  peekAwaitable disposeCompleted >>= \case
    Just () -> pure ()
    Nothing -> registerDisposable disposable