diff --git a/quasar.cabal b/quasar.cabal index d3964b7a5f93055222c25064f6420dbd37ec891d..ed7c7dff82745534cfb432d526f8ac996daf3931 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -38,6 +38,7 @@ common shared-properties MultiParamTypeClasses NamedFieldPuns NoImplicitPrelude + NumericUnderscores OverloadedStrings PolyKinds RankNTypes diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index ed1745b2e438d8d5aa2f49500a4775fcef96ef66..1827ee0cb805e6967a4019b2185c8a8681a8eb67 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -45,7 +45,7 @@ asyncWithUnmask -> m (Async r) asyncWithUnmask action = do resourceManager <- askResourceManager - asyncWithHandlerAndUnmask (throwToResourceManager resourceManager . AsyncException) action + asyncWithHandlerAndUnmask (onResourceManager resourceManager . throwToResourceManager . AsyncException) action async_ :: (MonadResourceManager m, MonadIO m, MonadMask m) => ResourceManagerIO () -> m () async_ action = void $ async action diff --git a/src/Quasar/ResourceManager.hs b/src/Quasar/ResourceManager.hs index 63c643ddb182aff5a1b22edae444cfc935e95278..0a1bef62f24eb7f023101cea5e7146963d46b73e 100644 --- a/src/Quasar/ResourceManager.hs +++ b/src/Quasar/ResourceManager.hs @@ -10,6 +10,7 @@ module Quasar.ResourceManager ( registerDisposable, registerDisposeAction, registerAsyncDisposeAction, + throwToResourceManager, withScopedResourceManager, onResourceManager, onResourceManagerSTM, @@ -43,7 +44,7 @@ module Quasar.ResourceManager ( ) where -import Control.Concurrent (ThreadId, forkIO, myThreadId, throwTo) +import Control.Concurrent (ThreadId, forkIO, myThreadId, throwTo, threadDelay) import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Reader @@ -97,10 +98,11 @@ class IsDisposable a => IsResourceManager a where lockResourceManagerImpl self = lockResourceManagerImpl (toResourceManager self) -- | Forward an exception that happened asynchronously. - throwToResourceManager :: Exception e => a -> e -> IO () - throwToResourceManager = throwToResourceManager . toResourceManager + throwToResourceManagerImpl :: Exception e => a -> e -> STM () + throwToResourceManagerImpl = throwToResourceManagerImpl . toResourceManager + + {-# MINIMAL toResourceManager | (attachDisposable, lockResourceManagerImpl, throwToResourceManagerImpl) #-} - {-# MINIMAL toResourceManager | (attachDisposable, lockResourceManagerImpl, throwToResourceManager) #-} data ResourceManager = forall a. IsResourceManager a => ResourceManager a @@ -108,7 +110,7 @@ instance IsResourceManager ResourceManager where toResourceManager = id attachDisposable (ResourceManager x) = attachDisposable x lockResourceManagerImpl (ResourceManager x) = lockResourceManagerImpl x - throwToResourceManager (ResourceManager x) = throwToResourceManager x + throwToResourceManagerImpl (ResourceManager x) = throwToResourceManagerImpl x instance IsDisposable ResourceManager where toDisposable (ResourceManager x) = toDisposable x @@ -130,6 +132,13 @@ class MonadFix m => MonadResourceManager m where -- embedded in a larger transaction. runInSTM :: MonadResourceManager m => STM a -> m a + +throwToResourceManager :: (Exception e, MonadResourceManager m) => e -> m () +throwToResourceManager exception = do + resourceManager <- askResourceManager + runInSTM $ throwToResourceManagerImpl resourceManager exception + + runInResourceManagerSTM :: MonadResourceManager m => ResourceManagerSTM a -> m a runInResourceManagerSTM action = do resourceManager <- askResourceManager @@ -257,7 +266,7 @@ disposeOnError action = do enterResourceManager :: MonadIO m => ResourceManager -> ResourceManagerIO () -> m () enterResourceManager resourceManager action = liftIO do onResourceManager resourceManager $ lockResourceManager do - action `catchAll` \ex -> liftIO $ throwToResourceManager resourceManager ex + action `catchAll` \ex -> throwToResourceManager ex -- | Run a computation on a resource manager and throw any exception that occurs to the resource manager. -- @@ -278,16 +287,12 @@ data RootResourceManager instance IsResourceManager RootResourceManager where attachDisposable (RootResourceManager internal _ _ _) = attachDisposable internal lockResourceManagerImpl (RootResourceManager internal _ _ _) = lockResourceManagerImpl internal - throwToResourceManager (RootResourceManager _ _ exceptionsVar _) ex = do - -- TODO only log exceptions after a timeout - traceIO $ "Exception thrown to root resource manager: " <> displayException ex - liftIO $ join $ atomically do - tryTakeTMVar exceptionsVar >>= \case - Just exceptions -> do - putTMVar exceptionsVar (exceptions |> toException ex) - pure $ pure @IO () - Nothing -> do - pure $ fail @IO "Could not throw to resource manager: RootResourceManager is already disposed" + throwToResourceManagerImpl (RootResourceManager _ _ exceptionsVar _) ex = do + tryTakeTMVar exceptionsVar >>= \case + Just exceptions -> do + putTMVar exceptionsVar (exceptions |> toException ex) + Nothing -> do + throwM $ userError "Could not throw to resource manager: RootResourceManager is already disposed" instance IsDisposable RootResourceManager where @@ -306,25 +311,26 @@ newUnmanagedRootResourceManagerInternal = liftIO do exceptionsVar <- newTMVarIO Empty finalExceptionsVar <- newAsyncVar mfix \root -> do - -- TODO reevaluate if using unmanagedAsync and voiding the result is correct - void $ unmanagedAsync (disposeThread root) + void $ forkIO (disposeWorker root) internal <- atomically $ newUnmanagedDefaultResourceManagerInternal (toResourceManager root) pure $ RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar where - disposeThread :: RootResourceManager -> IO () - disposeThread (RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar) = + disposeWorker :: RootResourceManager -> IO () + disposeWorker (RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar) = handleAll do \ex -> fail $ "RootResourceManager thread failed unexpectedly: " <> displayException ex do -- Wait until disposing atomically do disposing <- readTVar disposingVar - hasExceptions <- (> 0) . Seq.length <$> readTMVar exceptionsVar + hasExceptions <- not . Seq.null <$> readTMVar exceptionsVar check $ disposing || hasExceptions - -- TODO start the thread that reports exceptions (or a potential hang) after a timeout + -- Start a thread to report exceptions (or a potential hang) after a timeout + reportThread <- unmanagedAsync reportTimeout + -- Dispose resources dispose internal atomically do @@ -333,6 +339,38 @@ newUnmanagedRootResourceManagerInternal = liftIO do putAsyncVarSTM_ finalExceptionsVar $ toList exceptions + -- Clean up timeout/report thread + dispose reportThread + + where + timeoutSeconds :: Int + timeoutSeconds = 5 + timeoutMicroseconds :: Int + timeoutMicroseconds = timeoutSeconds * 1_000_000 + + reportTimeout :: IO () + reportTimeout = do + threadDelay timeoutMicroseconds + atomically (tryReadTMVar exceptionsVar) >>= \case + Nothing -> pure () -- Terminate + Just Empty -> do + traceIO $ mconcat ["Root resource manager did not dispose within ", show timeoutSeconds, " seconds"] + reportExceptions 0 Empty + Just exs -> do + traceIO $ mconcat [ "Root resource manager did not dispose within ", + show timeoutSeconds, " seconds (", show (length exs), " exception(s) queued)" ] + reportExceptions 0 exs + + reportExceptions :: Int -> Seq SomeException -> IO () + reportExceptions alreadyReported Empty = join $ atomically do + Seq.drop alreadyReported <<$>> tryReadTMVar exceptionsVar >>= \case + Nothing -> pure $ pure () -- Terminate + Just Empty -> retry + Just exs -> pure $ reportExceptions alreadyReported exs + reportExceptions alreadyReported (ex :<| exs) = do + traceIO $ "Exception thrown to blocked root resource manager: " <> displayException ex + reportExceptions (alreadyReported + 1) exs + withRootResourceManager :: MonadIO m => ResourceManagerIO a -> m a withRootResourceManager action = liftIO $ uninterruptibleMask \unmask -> do @@ -352,7 +390,7 @@ withRootResourceManager action = liftIO $ uninterruptibleMask \unmask -> do data DefaultResourceManager = DefaultResourceManager { resourceManagerKey :: Unique, - throwToHandler :: SomeException -> IO (), + throwToHandler :: SomeException -> STM (), stateVar :: TVar ResourceManagerState, disposablesVar :: TMVar (HashMap Unique Disposable), lockVar :: TVar Word64, @@ -366,7 +404,7 @@ data ResourceManagerState | ResourceManagerDisposed instance IsResourceManager DefaultResourceManager where - throwToResourceManager DefaultResourceManager{throwToHandler} = throwToHandler . toException + throwToResourceManagerImpl DefaultResourceManager{throwToHandler} = throwToHandler . toException attachDisposable DefaultResourceManager{stateVar, disposablesVar} disposable = do key <- newUniqueSTM @@ -452,7 +490,9 @@ instance IsDisposable DefaultResourceManager where defaultResourceManagerDisposeResult self <$ forkIO do catchAll action - \ex -> throwToResourceManager self (userError ("Dispose thread failed for DefaultResourceManager: " <> displayException ex)) + \ex -> + onResourceManager self $ throwToResourceManager $ + userError ("Dispose thread failed for DefaultResourceManager: " <> displayException ex) takeDisposables :: STM [Disposable] takeDisposables = toList <$> takeTMVar disposablesVar @@ -468,14 +508,14 @@ instance IsDisposable DefaultResourceManager where DisposeResultAwait awaitable -> (processDisposeException awaitable, [] <$ awaitSuccessOrFailure awaitable) DisposeResultResourceManager resourceManagerResult -> (pure (), pure [resourceManagerResult]) \ex -> do - throwToResourceManager self $ DisposeException ex + onResourceManager self $ throwToResourceManager $ DisposeException ex pure (pure (), pure []) processDisposeException :: Awaitable () -> IO () processDisposeException awaitable = await awaitable `catchAll` - \ex -> throwToResourceManager self $ DisposeException ex + \ex -> onResourceManager self $ throwToResourceManager $ DisposeException ex completeDisposing :: IO () completeDisposing = @@ -511,7 +551,7 @@ newUnmanagedDefaultResourceManagerInternal parentResourceManager = do pure DefaultResourceManager { resourceManagerKey, - throwToHandler = throwToResourceManager parentResourceManager, + throwToHandler = throwToResourceManagerImpl parentResourceManager, stateVar, disposablesVar, lockVar, diff --git a/test/Quasar/ResourceManagerSpec.hs b/test/Quasar/ResourceManagerSpec.hs index 6224d531a1e930c9aa2d8cb5778b34b45089c3ee..1fe25306a78dcb44321f86d942d1acf592f161cc 100644 --- a/test/Quasar/ResourceManagerSpec.hs +++ b/test/Quasar/ResourceManagerSpec.hs @@ -96,15 +96,13 @@ spec = parallel $ do (`shouldThrow` \(_ :: CombinedException) -> True) do withRootResourceManager do linkExecution do - rm <- askResourceManager - liftIO $ throwToResourceManager rm TestException + throwToResourceManager TestException sleepForever it "combines exceptions from resources with exceptions on the thread" $ io do (`shouldThrow` \(combinedExceptions -> exceptions) -> length exceptions == 2) do withRootResourceManager do - rm <- askResourceManager - liftIO $ throwToResourceManager rm TestException + throwToResourceManager TestException throwM TestException it "can dispose a resource manager loop" $ io do @@ -145,6 +143,5 @@ spec = parallel $ do withRootResourceManager do linkExecution do pure () - rm <- askResourceManager - liftIO $ throwToResourceManager rm TestException + throwToResourceManager TestException liftIO $ threadDelay 100000