From 3f5e71efbd0021cf74cf45974e5694e8ecc389d3 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 2 Jan 2022 02:39:00 +0100 Subject: [PATCH] Move throwToResourceManager to MonadResourceManager This adds the capability to report exceptions from both ResourceManagerIO and ResourceManagerSTM, further increasing feature parity. Adds a timeout to the root resource manager before exceptions are logged to the console (usually they are thrown as a CombinedException, so logging is only required for debugging visibility when resource management is deadlocked). --- quasar.cabal | 1 + src/Quasar/Async.hs | 2 +- src/Quasar/ResourceManager.hs | 96 +++++++++++++++++++++--------- test/Quasar/ResourceManagerSpec.hs | 9 +-- 4 files changed, 73 insertions(+), 35 deletions(-) diff --git a/quasar.cabal b/quasar.cabal index d3964b7..ed7c7df 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -38,6 +38,7 @@ common shared-properties MultiParamTypeClasses NamedFieldPuns NoImplicitPrelude + NumericUnderscores OverloadedStrings PolyKinds RankNTypes diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index ed1745b..1827ee0 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -45,7 +45,7 @@ asyncWithUnmask -> m (Async r) asyncWithUnmask action = do resourceManager <- askResourceManager - asyncWithHandlerAndUnmask (throwToResourceManager resourceManager . AsyncException) action + asyncWithHandlerAndUnmask (onResourceManager resourceManager . throwToResourceManager . AsyncException) action async_ :: (MonadResourceManager m, MonadIO m, MonadMask m) => ResourceManagerIO () -> m () async_ action = void $ async action diff --git a/src/Quasar/ResourceManager.hs b/src/Quasar/ResourceManager.hs index 63c643d..0a1bef6 100644 --- a/src/Quasar/ResourceManager.hs +++ b/src/Quasar/ResourceManager.hs @@ -10,6 +10,7 @@ module Quasar.ResourceManager ( registerDisposable, registerDisposeAction, registerAsyncDisposeAction, + throwToResourceManager, withScopedResourceManager, onResourceManager, onResourceManagerSTM, @@ -43,7 +44,7 @@ module Quasar.ResourceManager ( ) where -import Control.Concurrent (ThreadId, forkIO, myThreadId, throwTo) +import Control.Concurrent (ThreadId, forkIO, myThreadId, throwTo, threadDelay) import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Reader @@ -97,10 +98,11 @@ class IsDisposable a => IsResourceManager a where lockResourceManagerImpl self = lockResourceManagerImpl (toResourceManager self) -- | Forward an exception that happened asynchronously. - throwToResourceManager :: Exception e => a -> e -> IO () - throwToResourceManager = throwToResourceManager . toResourceManager + throwToResourceManagerImpl :: Exception e => a -> e -> STM () + throwToResourceManagerImpl = throwToResourceManagerImpl . toResourceManager + + {-# MINIMAL toResourceManager | (attachDisposable, lockResourceManagerImpl, throwToResourceManagerImpl) #-} - {-# MINIMAL toResourceManager | (attachDisposable, lockResourceManagerImpl, throwToResourceManager) #-} data ResourceManager = forall a. IsResourceManager a => ResourceManager a @@ -108,7 +110,7 @@ instance IsResourceManager ResourceManager where toResourceManager = id attachDisposable (ResourceManager x) = attachDisposable x lockResourceManagerImpl (ResourceManager x) = lockResourceManagerImpl x - throwToResourceManager (ResourceManager x) = throwToResourceManager x + throwToResourceManagerImpl (ResourceManager x) = throwToResourceManagerImpl x instance IsDisposable ResourceManager where toDisposable (ResourceManager x) = toDisposable x @@ -130,6 +132,13 @@ class MonadFix m => MonadResourceManager m where -- embedded in a larger transaction. runInSTM :: MonadResourceManager m => STM a -> m a + +throwToResourceManager :: (Exception e, MonadResourceManager m) => e -> m () +throwToResourceManager exception = do + resourceManager <- askResourceManager + runInSTM $ throwToResourceManagerImpl resourceManager exception + + runInResourceManagerSTM :: MonadResourceManager m => ResourceManagerSTM a -> m a runInResourceManagerSTM action = do resourceManager <- askResourceManager @@ -257,7 +266,7 @@ disposeOnError action = do enterResourceManager :: MonadIO m => ResourceManager -> ResourceManagerIO () -> m () enterResourceManager resourceManager action = liftIO do onResourceManager resourceManager $ lockResourceManager do - action `catchAll` \ex -> liftIO $ throwToResourceManager resourceManager ex + action `catchAll` \ex -> throwToResourceManager ex -- | Run a computation on a resource manager and throw any exception that occurs to the resource manager. -- @@ -278,16 +287,12 @@ data RootResourceManager instance IsResourceManager RootResourceManager where attachDisposable (RootResourceManager internal _ _ _) = attachDisposable internal lockResourceManagerImpl (RootResourceManager internal _ _ _) = lockResourceManagerImpl internal - throwToResourceManager (RootResourceManager _ _ exceptionsVar _) ex = do - -- TODO only log exceptions after a timeout - traceIO $ "Exception thrown to root resource manager: " <> displayException ex - liftIO $ join $ atomically do - tryTakeTMVar exceptionsVar >>= \case - Just exceptions -> do - putTMVar exceptionsVar (exceptions |> toException ex) - pure $ pure @IO () - Nothing -> do - pure $ fail @IO "Could not throw to resource manager: RootResourceManager is already disposed" + throwToResourceManagerImpl (RootResourceManager _ _ exceptionsVar _) ex = do + tryTakeTMVar exceptionsVar >>= \case + Just exceptions -> do + putTMVar exceptionsVar (exceptions |> toException ex) + Nothing -> do + throwM $ userError "Could not throw to resource manager: RootResourceManager is already disposed" instance IsDisposable RootResourceManager where @@ -306,25 +311,26 @@ newUnmanagedRootResourceManagerInternal = liftIO do exceptionsVar <- newTMVarIO Empty finalExceptionsVar <- newAsyncVar mfix \root -> do - -- TODO reevaluate if using unmanagedAsync and voiding the result is correct - void $ unmanagedAsync (disposeThread root) + void $ forkIO (disposeWorker root) internal <- atomically $ newUnmanagedDefaultResourceManagerInternal (toResourceManager root) pure $ RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar where - disposeThread :: RootResourceManager -> IO () - disposeThread (RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar) = + disposeWorker :: RootResourceManager -> IO () + disposeWorker (RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar) = handleAll do \ex -> fail $ "RootResourceManager thread failed unexpectedly: " <> displayException ex do -- Wait until disposing atomically do disposing <- readTVar disposingVar - hasExceptions <- (> 0) . Seq.length <$> readTMVar exceptionsVar + hasExceptions <- not . Seq.null <$> readTMVar exceptionsVar check $ disposing || hasExceptions - -- TODO start the thread that reports exceptions (or a potential hang) after a timeout + -- Start a thread to report exceptions (or a potential hang) after a timeout + reportThread <- unmanagedAsync reportTimeout + -- Dispose resources dispose internal atomically do @@ -333,6 +339,38 @@ newUnmanagedRootResourceManagerInternal = liftIO do putAsyncVarSTM_ finalExceptionsVar $ toList exceptions + -- Clean up timeout/report thread + dispose reportThread + + where + timeoutSeconds :: Int + timeoutSeconds = 5 + timeoutMicroseconds :: Int + timeoutMicroseconds = timeoutSeconds * 1_000_000 + + reportTimeout :: IO () + reportTimeout = do + threadDelay timeoutMicroseconds + atomically (tryReadTMVar exceptionsVar) >>= \case + Nothing -> pure () -- Terminate + Just Empty -> do + traceIO $ mconcat ["Root resource manager did not dispose within ", show timeoutSeconds, " seconds"] + reportExceptions 0 Empty + Just exs -> do + traceIO $ mconcat [ "Root resource manager did not dispose within ", + show timeoutSeconds, " seconds (", show (length exs), " exception(s) queued)" ] + reportExceptions 0 exs + + reportExceptions :: Int -> Seq SomeException -> IO () + reportExceptions alreadyReported Empty = join $ atomically do + Seq.drop alreadyReported <<$>> tryReadTMVar exceptionsVar >>= \case + Nothing -> pure $ pure () -- Terminate + Just Empty -> retry + Just exs -> pure $ reportExceptions alreadyReported exs + reportExceptions alreadyReported (ex :<| exs) = do + traceIO $ "Exception thrown to blocked root resource manager: " <> displayException ex + reportExceptions (alreadyReported + 1) exs + withRootResourceManager :: MonadIO m => ResourceManagerIO a -> m a withRootResourceManager action = liftIO $ uninterruptibleMask \unmask -> do @@ -352,7 +390,7 @@ withRootResourceManager action = liftIO $ uninterruptibleMask \unmask -> do data DefaultResourceManager = DefaultResourceManager { resourceManagerKey :: Unique, - throwToHandler :: SomeException -> IO (), + throwToHandler :: SomeException -> STM (), stateVar :: TVar ResourceManagerState, disposablesVar :: TMVar (HashMap Unique Disposable), lockVar :: TVar Word64, @@ -366,7 +404,7 @@ data ResourceManagerState | ResourceManagerDisposed instance IsResourceManager DefaultResourceManager where - throwToResourceManager DefaultResourceManager{throwToHandler} = throwToHandler . toException + throwToResourceManagerImpl DefaultResourceManager{throwToHandler} = throwToHandler . toException attachDisposable DefaultResourceManager{stateVar, disposablesVar} disposable = do key <- newUniqueSTM @@ -452,7 +490,9 @@ instance IsDisposable DefaultResourceManager where defaultResourceManagerDisposeResult self <$ forkIO do catchAll action - \ex -> throwToResourceManager self (userError ("Dispose thread failed for DefaultResourceManager: " <> displayException ex)) + \ex -> + onResourceManager self $ throwToResourceManager $ + userError ("Dispose thread failed for DefaultResourceManager: " <> displayException ex) takeDisposables :: STM [Disposable] takeDisposables = toList <$> takeTMVar disposablesVar @@ -468,14 +508,14 @@ instance IsDisposable DefaultResourceManager where DisposeResultAwait awaitable -> (processDisposeException awaitable, [] <$ awaitSuccessOrFailure awaitable) DisposeResultResourceManager resourceManagerResult -> (pure (), pure [resourceManagerResult]) \ex -> do - throwToResourceManager self $ DisposeException ex + onResourceManager self $ throwToResourceManager $ DisposeException ex pure (pure (), pure []) processDisposeException :: Awaitable () -> IO () processDisposeException awaitable = await awaitable `catchAll` - \ex -> throwToResourceManager self $ DisposeException ex + \ex -> onResourceManager self $ throwToResourceManager $ DisposeException ex completeDisposing :: IO () completeDisposing = @@ -511,7 +551,7 @@ newUnmanagedDefaultResourceManagerInternal parentResourceManager = do pure DefaultResourceManager { resourceManagerKey, - throwToHandler = throwToResourceManager parentResourceManager, + throwToHandler = throwToResourceManagerImpl parentResourceManager, stateVar, disposablesVar, lockVar, diff --git a/test/Quasar/ResourceManagerSpec.hs b/test/Quasar/ResourceManagerSpec.hs index 6224d53..1fe2530 100644 --- a/test/Quasar/ResourceManagerSpec.hs +++ b/test/Quasar/ResourceManagerSpec.hs @@ -96,15 +96,13 @@ spec = parallel $ do (`shouldThrow` \(_ :: CombinedException) -> True) do withRootResourceManager do linkExecution do - rm <- askResourceManager - liftIO $ throwToResourceManager rm TestException + throwToResourceManager TestException sleepForever it "combines exceptions from resources with exceptions on the thread" $ io do (`shouldThrow` \(combinedExceptions -> exceptions) -> length exceptions == 2) do withRootResourceManager do - rm <- askResourceManager - liftIO $ throwToResourceManager rm TestException + throwToResourceManager TestException throwM TestException it "can dispose a resource manager loop" $ io do @@ -145,6 +143,5 @@ spec = parallel $ do withRootResourceManager do linkExecution do pure () - rm <- askResourceManager - liftIO $ throwToResourceManager rm TestException + throwToResourceManager TestException liftIO $ threadDelay 100000 -- GitLab