diff --git a/quasar.cabal b/quasar.cabal index 215bd82384d6b728e112d614b398b681b658f196..83d12875e127fe60a7558263d84f29b22fe291dc 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -22,6 +22,7 @@ common shared-properties DeriveAnyClass DeriveGeneric DerivingStrategies + DisambiguateRecordFields DuplicateRecordFields ExistentialQuantification FlexibleContexts @@ -32,6 +33,8 @@ common shared-properties ImportQualifiedPost InstanceSigs LambdaCase + -- Enable once 9.0.1 is required + --LexicalNegation MultiParamTypeClasses NamedFieldPuns NoImplicitPrelude diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index 4a4976ba235efd2ef8e95f76698c9450d92a4bfe..0608eeb23a5b38b9f685b9471103e4b72da38718 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -34,6 +34,7 @@ data AsyncContext = forall a. IsAsyncContext a => AsyncContext a instance IsAsyncContext AsyncContext where asyncOnContextWithUnmask (AsyncContext ctx) = asyncOnContextWithUnmask ctx + toAsyncContext = id data UnlimitedAsyncContext = UnlimitedAsyncContext diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs index 0c51aec57260571b29afc2778a75ca3c5df91636..91de38d9f19f196d82a30317182cd8e7d2396f17 100644 --- a/src/Quasar/Awaitable.hs +++ b/src/Quasar/Awaitable.hs @@ -316,7 +316,7 @@ instance Alternative AwaitableStepM where x <|> y = x `catchAll` const y empty = throwM $ toException $ userError "empty" -instance MonadPlus AwaitableStepM where +instance MonadPlus AwaitableStepM instance MonadFix AwaitableStepM where mfix :: forall a. (a -> AwaitableStepM a) -> AwaitableStepM a diff --git a/src/Quasar/Disposable.hs b/src/Quasar/Disposable.hs index d018b6f6818a7ab3c7ab7ff4118acd8e9c88d7ce..1b7b45ca0e994f86767feb327cd803cbb6ce224a 100644 --- a/src/Quasar/Disposable.hs +++ b/src/Quasar/Disposable.hs @@ -2,229 +2,181 @@ module Quasar.Disposable ( -- * Disposable IsDisposable(..), Disposable, - disposeAndAwait, + dispose, + disposeEventually, + disposeEventually_, + newDisposable, - synchronousDisposable, noDisposable, - alreadyDisposing, - - -- * Task - Task(..), - cancelTask, - toTask, - completedTask, - successfulTask, - failedTask, - - -- ** Task exceptions - CancelTask(..), - TaskDisposed(..), + + -- * Implementation internals + DisposeResult(..), + ResourceManagerResult(..), + DisposableFinalizers, + newDisposableFinalizers, + defaultRegisterFinalizer, + defaultRunFinalizers, + awaitResourceManagerResult, ) where import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Reader +import Data.List.NonEmpty (nonEmpty) +import Data.HashSet (HashSet) +import Data.HashSet qualified as HashSet import Quasar.Awaitable import Quasar.Prelude +import Quasar.Utils.Exceptions -- * Disposable class IsDisposable a where - -- | Dispose a resource. Completion of the returned `Awaitable` signals, that the resource has been released. + -- | Convert an `IsDisposable`-Object to a `Disposable`. -- - -- Dispose should be idempotent, i.e. calling `dispose` once or multiple times should have the same effect. + -- When implementing the `IsDisposable`-class this can be used to defer the dispose behavior to a disposable created + -- by e.g. `newDisposable`. + toDisposable :: a -> Disposable + toDisposable = Disposable + + -- | Begin to dispose (/release) resource(s). + -- + -- The implementation has to be idempotent, i.e. calling `beginDispose` once or multiple times should have the same + -- effect. -- - -- `dispose` should normally be run in /masked/ state. The implementation of `dispose` has to guarantee that - -- resources are disposed even when encountering asynchronous exceptions, or should disable asynchronous exceptions - -- itself (e.g. by using `uninterruptibleMask_`). + -- `beginDispose` must be called in masked state. -- - -- `dispose` should also function correctly when run with uninterruptible exceptions masked. - dispose :: (MonadIO m, MonadMask m) => a -> m (Awaitable ()) - dispose = dispose . toDisposable - -- Regarding the requirements for the masking state (masked, but not uninterruptible) some arguments from - -- `safe-exceptions` were considered: - -- https://github.com/fpco/safe-exceptions/issues/3#issuecomment-230274166 + -- `beginDispose` must not block for an unbounded time. + beginDispose :: a -> IO DisposeResult + beginDispose = beginDispose . toDisposable isDisposed :: a -> Awaitable () isDisposed = isDisposed . toDisposable - -- | Convert an `IsDisposable`-Object to a `Disposable`. - -- - -- When implementing the `IsDisposable`-class this can be used to defer the dispose behavior to a disposable created - -- by e.g. `newDisposable`. - toDisposable :: a -> Disposable - toDisposable = Disposable + -- | Finalizers MUST NOT throw exceptions. + registerFinalizer :: a -> STM () -> STM Bool + registerFinalizer = registerFinalizer . toDisposable + + {-# MINIMAL toDisposable | (beginDispose, isDisposed, registerFinalizer) #-} + +dispose :: MonadIO m => IsDisposable a => a -> m () +dispose disposable = liftIO do + uninterruptibleMask_ (beginDispose disposable) >>= \case + DisposeResultDisposed -> pure () + (DisposeResultAwait awaitable) -> await awaitable + (DisposeResultResourceManager result) -> awaitResourceManagerResult result + +-- | Begin to dispose a resource. +disposeEventually :: (IsDisposable a, MonadIO m) => a -> m (Awaitable ()) +disposeEventually disposable = do + disposeEventually_ disposable + pure $ isDisposed disposable + +-- | Begin to dispose a resource. +disposeEventually_ :: (IsDisposable a, MonadIO m) => a -> m () +disposeEventually_ disposable = liftIO do + uninterruptibleMask_ $ void $ beginDispose disposable - {-# MINIMAL toDisposable | (dispose, isDisposed) #-} +awaitResourceManagerResult :: forall m. MonadAwait m => ResourceManagerResult -> m () +awaitResourceManagerResult = void . go mempty + where + go :: HashSet Unique -> ResourceManagerResult -> m (HashSet Unique) + go keys (ResourceManagerResult key awaitable) + | HashSet.member key keys = pure keys -- resource manager was encountered before + | otherwise = do + dependencies <- await awaitable + foldM go (HashSet.insert key keys) dependencies --- TODO remove -disposeAndAwait :: (MonadAwait m, MonadIO m) => IsDisposable a => a -> m () -disposeAndAwait disposable = await =<< liftIO (dispose disposable) +data DisposeResult + = DisposeResultDisposed + | DisposeResultAwait (Awaitable ()) + | DisposeResultResourceManager ResourceManagerResult +data ResourceManagerResult = ResourceManagerResult Unique (Awaitable [ResourceManagerResult]) instance IsDisposable a => IsDisposable (Maybe a) where toDisposable = maybe noDisposable toDisposable - data Disposable = forall a. IsDisposable a => Disposable a instance IsDisposable Disposable where - dispose (Disposable x) = dispose x + beginDispose (Disposable x) = beginDispose x isDisposed (Disposable x) = isDisposed x + registerFinalizer (Disposable x) = registerFinalizer x toDisposable = id -instance Semigroup Disposable where - x <> y = toDisposable $ CombinedDisposable x y - -instance Monoid Disposable where - mempty = toDisposable EmptyDisposable - mconcat = toDisposable . ListDisposable - instance IsAwaitable () Disposable where toAwaitable = isDisposed -newtype FnDisposable = FnDisposable (TMVar (Either (IO (Awaitable ())) (Awaitable ()))) +data ImmediateDisposable = ImmediateDisposable Unique (TMVar (IO ())) DisposableFinalizers (AsyncVar ()) -instance IsDisposable FnDisposable where - dispose (FnDisposable var) = liftIO do - mask \restore -> do - eitherVal <- atomically do - takeTMVar var >>= \case - l@(Left _action) -> pure l - -- If the var contains an awaitable its put back immediately to save a second transaction - r@(Right _awaitable) -> r <$ putTMVar var r - case eitherVal of - l@(Left action) -> do - awaitable <- restore action `onException` atomically (putTMVar var l) - atomically $ putTMVar var $ Right awaitable - pure awaitable - Right awaitable -> pure awaitable +instance IsDisposable ImmediateDisposable where + beginDispose (ImmediateDisposable key actionVar finalizers resultVar) = do + -- This is only safe when run in masked state + atomically (tryTakeTMVar actionVar) >>= mapM_ \action -> do + result <- try action + atomically do + putAsyncVarEitherSTM_ resultVar result + defaultRunFinalizers finalizers + -- Await so concurrent `beginDispose` calls don't exit too early + await resultVar + pure DisposeResultDisposed - isDisposed = toAwaitable + isDisposed (ImmediateDisposable _ _ _ resultVar) = toAwaitable resultVar `catchAll` \_ -> pure () -instance IsAwaitable () FnDisposable where - toAwaitable :: FnDisposable -> Awaitable () - toAwaitable (FnDisposable var) = - join $ unsafeAwaitSTM do - state <- readTMVar var - case state of - -- Wait until disposing has been started - Left _ -> retry - -- Wait for disposing to complete - Right awaitable -> pure awaitable + registerFinalizer (ImmediateDisposable _ _ finalizers _) = defaultRegisterFinalizer finalizers +newImmediateDisposable :: MonadIO m => IO () -> m Disposable +newImmediateDisposable disposeAction = liftIO do + key <- newUnique + fmap toDisposable $ ImmediateDisposable key <$> newTMVarIO disposeAction <*> newDisposableFinalizers <*> newAsyncVar -data CombinedDisposable = CombinedDisposable Disposable Disposable -instance IsDisposable CombinedDisposable where - dispose (CombinedDisposable x y) = liftA2 (<>) (dispose x) (dispose y) - isDisposed (CombinedDisposable x y) = liftA2 (<>) (isDisposed x) (isDisposed y) - -newtype ListDisposable = ListDisposable [Disposable] - -instance IsDisposable ListDisposable where - dispose (ListDisposable disposables) = mconcat <$> traverse dispose disposables - isDisposed (ListDisposable disposables) = traverse_ isDisposed disposables +-- | Create a new disposable from an IO action. Is is guaranteed, that the IO action will only be called once (even when +-- `dispose` is called multiple times). +newDisposable :: MonadIO m => IO () -> m Disposable +newDisposable = newImmediateDisposable data EmptyDisposable = EmptyDisposable instance IsDisposable EmptyDisposable where - dispose _ = pure $ pure () + beginDispose EmptyDisposable = pure DisposeResultDisposed isDisposed _ = pure () + registerFinalizer _ _ = pure False --- | Create a new disposable from an IO action. Is is guaranteed, that the IO action will only be called once (even when --- `dispose` is called multiple times). -newDisposable :: MonadIO m => IO (Awaitable ()) -> m Disposable -newDisposable action = liftIO $ toDisposable . FnDisposable <$> newTMVarIO (Left action) - --- | Create a new disposable from an IO action. Is is guaranteed, that the IO action will only be called once (even when --- `dispose` is called multiple times). -synchronousDisposable :: MonadIO m => IO () -> m Disposable -synchronousDisposable = newDisposable . fmap pure - -- | A `Disposable` for which `dispose` is a no-op and which reports as already disposed. --- --- Alias for `mempty`. noDisposable :: Disposable -noDisposable = mempty - - -newtype AlreadyDisposing = AlreadyDisposing (Awaitable ()) - -instance IsDisposable AlreadyDisposing where - dispose = pure . isDisposed - isDisposed (AlreadyDisposing awaitable) = awaitable - --- | Create a `Disposable` for a dispose operation which is already in progress. The awaitable passed as a parameter --- is used to track the completion status of the dispose operation. --- --- The disposable is considered to be already disposing (so `dispose` will be a no-op) and is considered disposed once --- the awaitable is completed. -alreadyDisposing :: IsAwaitable () a => a -> Disposable -alreadyDisposing someAwaitable = toDisposable $ AlreadyDisposing $ toAwaitable someAwaitable - - - - - - - - --- | A task is an operation (e.g. a thread or a network request) that is running asynchronously and can be cancelled. --- It has a result and can fail. --- --- The result (or exception) can be aquired by using the `IsAwaitable` class (e.g. by calling `await` or `awaitIO`). --- It is possible to cancel the task by using `dispose` or `cancelTask` if the operation has not been completed. -data Task r = Task Disposable (Awaitable r) - -instance IsAwaitable r (Task r) where - toAwaitable (Task _ awaitable) = awaitable - -instance IsDisposable (Task r) where - toDisposable (Task disposable _) = disposable - -instance Functor Task where - fmap fn (Task disposable awaitable) = Task disposable (fn <$> awaitable) - -instance Applicative Task where - pure value = Task noDisposable (pure value) - liftA2 fn (Task dx fx) (Task dy fy) = Task (dx <> dy) $ liftA2 fn fx fy - --- | Alias for `dispose`. -cancelTask :: Task r -> IO (Awaitable ()) -cancelTask = dispose - --- | Creates an `Task` from an `Awaitable`. --- The resulting task only depends on an external resource, so disposing it has no effect. -toTask :: IsAwaitable r a => a -> Task r -toTask result = Task noDisposable (toAwaitable result) +noDisposable = toDisposable EmptyDisposable -completedTask :: Either SomeException r -> Task r -completedTask result = Task noDisposable (completedAwaitable result) --- | Alias for `pure` -successfulTask :: r -> Task r -successfulTask = pure -failedTask :: SomeException -> Task r -failedTask ex = Task noDisposable (failedAwaitable ex) +-- * Implementation internals +newtype DisposableFinalizers = DisposableFinalizers (TMVar [STM ()]) +newDisposableFinalizers :: IO DisposableFinalizers +newDisposableFinalizers = DisposableFinalizers <$> newTMVarIO [] -data CancelTask = CancelTask - deriving stock Show -instance Exception CancelTask where +defaultRegisterFinalizer :: DisposableFinalizers -> STM () -> STM Bool +defaultRegisterFinalizer (DisposableFinalizers finalizerVar) finalizer = + tryTakeTMVar finalizerVar >>= \case + Just finalizers -> do + putTMVar finalizerVar (finalizer : finalizers) + pure True + Nothing -> pure False -data TaskDisposed = TaskDisposed - deriving stock Show -instance Exception TaskDisposed where +defaultRunFinalizers :: DisposableFinalizers -> STM () +defaultRunFinalizers (DisposableFinalizers finalizerVar) = do + tryTakeTMVar finalizerVar >>= \case + Just finalizers -> sequence_ finalizers + Nothing -> throwM $ userError "defaultRunFinalizers was called multiple times (it must only be run once)" diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index c53b6222157ee9bf9fe683ee58ee4f964cf75226..bca82b9fe04be364ae3c782f1eee6e0c811799ef 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -103,7 +103,7 @@ class IsRetrievable v o => IsObservable v o | o -> v where -- | Old signature of `observe`, will be removed from the class once it's no longer used for implementations. oldObserve :: o -> (ObservableMessage v -> IO ()) -> IO Disposable oldObserve observable callback = do - resourceManager <- newUnmanagedRootResourceManager + resourceManager <- (undefined :: IO ResourceManager) onResourceManager resourceManager do observe observable $ \msg -> liftIO (callback msg) pure $ toDisposable resourceManager @@ -232,7 +232,7 @@ instance IsObservable r (BindObservable r) where oldObserve :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable oldObserve (BindObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. - resourceManager <- newUnmanagedRootResourceManager + resourceManager <- (undefined :: IO ResourceManager) isDisposingVar <- newTVarIO False disposableVar <- newTMVarIO noDisposable @@ -240,12 +240,12 @@ instance IsObservable r (BindObservable r) where leftDisposable <- oldObserve fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) - attachDisposeAction_ resourceManager $ do - atomically $ writeTVar isDisposingVar True - d1 <- dispose leftDisposable - -- Block while the `outerCallback` is running - d2 <- dispose =<< atomically (takeTMVar disposableVar) - pure (d1 <> d2) + attachDisposeAction_ resourceManager $ undefined -- do + --atomically $ writeTVar isDisposingVar True + --d1 <- dispose leftDisposable + ---- Block while the `outerCallback` is running + --d2 <- dispose =<< atomically (takeTMVar disposableVar) + --pure (d1 <> d2) pure $ toDisposable resourceManager where @@ -300,7 +300,7 @@ instance IsObservable r (CatchObservable e r) where oldObserve :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable oldObserve (CatchObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. - resourceManager <- newUnmanagedRootResourceManager + resourceManager <- (undefined :: IO ResourceManager) isDisposingVar <- newTVarIO False disposableVar <- newTMVarIO noDisposable @@ -308,12 +308,12 @@ instance IsObservable r (CatchObservable e r) where leftDisposable <- oldObserve fx (outerCallback resourceManager isDisposingVar disposableVar keyVar) - attachDisposeAction_ resourceManager $ do - atomically $ writeTVar isDisposingVar True - d1 <- dispose leftDisposable - -- Block while the `outerCallback` is running - d2 <- dispose =<< atomically (takeTMVar disposableVar) - pure (d1 <> d2) + attachDisposeAction_ resourceManager $ undefined -- do + --atomically $ writeTVar isDisposingVar True + --d1 <- dispose leftDisposable + ---- Block while the `outerCallback` is running + --d2 <- dispose =<< atomically (takeTMVar disposableVar) + --pure (d1 <> d2) pure $ toDisposable resourceManager where @@ -369,7 +369,7 @@ instance IsObservable v (ObservableVar v) where -- Call listener callback (pure state) pure (state, HM.insert key callback subscribers) - synchronousDisposable (disposeFn key) + newDisposable (disposeFn key) where disposeFn :: Unique -> IO () disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers)) @@ -426,7 +426,8 @@ instance forall r o0 v0 o1 v1. (IsObservable v0 o0, IsObservable v1 o1) => IsObs var1 <- newTVarIO Nothing d0 <- oldObserve obs0 (mergeCallback var0 var1 . writeTVar var0 . Just) d1 <- oldObserve obs1 (mergeCallback var0 var1 . writeTVar var1 . Just) - pure $ mconcat [d0, d1] + undefined + --pure $ mconcat [d0, d1] where mergeCallback :: TVar (Maybe (ObservableMessage v0)) -> TVar (Maybe (ObservableMessage v1)) -> STM () -> IO () mergeCallback var0 var1 update = do diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index e8ab94f26d4a694cc5e58e62bb6ae55936b7ec3e..79daa037cb1e6e270c5b23f4c860324bd394e1dc 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -40,7 +40,7 @@ instance IsObservable (HM.HashMap k v) (ObservableHashMap k v) where callback $ pure $ toHashMap handle key <- newUnique let handle' = handle {subscribers = HM.insert key callback (subscribers handle)} - (handle',) <$> synchronousDisposable (unsubscribe key) + (handle',) <$> newDisposable (unsubscribe key) unsubscribe :: Unique -> IO () unsubscribe key = modifyHandle_ (\handle -> pure handle {subscribers = HM.delete key (subscribers handle)}) ohm @@ -52,7 +52,7 @@ instance IsDeltaObservable k v (ObservableHashMap k v) where callback (Reset $ toHashMap handle) key <- newUnique let handle' = handle {deltaSubscribers = HM.insert key callback (deltaSubscribers handle)} - (handle',) <$> synchronousDisposable (unsubscribe key) + (handle',) <$> newDisposable (unsubscribe key) unsubscribe :: Unique -> IO () unsubscribe key = modifyHandle_ (\handle -> pure handle {deltaSubscribers = HM.delete key (deltaSubscribers handle)}) ohm @@ -120,7 +120,7 @@ observeKey key ohm@(ObservableHashMap mvar) = synchronousFnObservable observeFn observeFn callback = do subscriptionKey <- newUnique modifyKeyHandle_ (subscribeFn' subscriptionKey) key ohm - synchronousDisposable (unsubscribe subscriptionKey) + newDisposable (unsubscribe subscriptionKey) where subscribeFn' :: Unique -> KeyHandle v -> IO (KeyHandle v) subscribeFn' subKey keyHandle@KeyHandle{value} = do diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index c0528bc4367f60b9e5940e2ff536d21892982477..edb9c50d7f2544a2261cebba208038370e444016 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -31,7 +31,7 @@ instance IsObservable (Maybe v) (ObservablePriority p v) where -- Call listener callback (pure (currentValue internals)) pure internals{subscribers = HM.insert key callback subscribers} - synchronousDisposable (unsubscribe key) + newDisposable (unsubscribe key) where unsubscribe :: Unique -> IO () unsubscribe key = modifyMVar_ mvar $ \internals@Internals{subscribers} -> pure internals{subscribers=HM.delete key subscribers} @@ -61,7 +61,7 @@ insertValue :: forall p v. (Ord p, Hashable p) => ObservablePriority p v -> p -> insertValue (ObservablePriority mvar) priority value = modifyMVar mvar $ \internals -> do key <- newUnique newInternals <- insertValue' key internals - (newInternals,) <$> synchronousDisposable (removeValue key) + (newInternals,) <$> newDisposable (removeValue key) where insertValue' :: Unique -> Internals p v -> IO (Internals p v) insertValue' key internals@Internals{priorityMap, current} diff --git a/src/Quasar/ResourceManager.hs b/src/Quasar/ResourceManager.hs index 721f8ef1717eb56d502886ed0ae61ddd46dc2a91..2b28bfe7ba34ef0a9f481046c6042f5f9002b4fb 100644 --- a/src/Quasar/ResourceManager.hs +++ b/src/Quasar/ResourceManager.hs @@ -5,13 +5,13 @@ module Quasar.ResourceManager ( registerNewResource, registerDisposable, registerDisposeAction, - registerSimpleDisposeAction, - disposeEventually, withSubResourceManagerM, onResourceManager, captureDisposable, captureDisposable_, - captureTask, + + -- ** Top level initialization + withRootResourceManager, -- ** ResourceManager IsResourceManager(..), @@ -20,29 +20,24 @@ module Quasar.ResourceManager ( attachDisposeAction, attachDisposeAction_, - -- ** Initialization - withRootResourceManager, - -- ** Linking computations to a resource manager linkExecution, CancelLinkedExecution, - -- ** Resource manager implementations - newUnmanagedRootResourceManager, - --newUnmanagedDefaultResourceManager, - -- * Reexports CombinedException, combinedExceptions, ) where -import Control.Concurrent (ThreadId, forkIOWithUnmask, myThreadId, throwTo) +import Control.Concurrent (ThreadId, forkIO, myThreadId, throwTo) import Control.Concurrent.STM import Control.Monad.Catch import Control.Monad.Reader import Data.Foldable (toList) -import Data.List.NonEmpty (NonEmpty(..), nonEmpty) +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HM +import Data.List.NonEmpty (NonEmpty(..), (<|), nonEmpty) import Data.Sequence (Seq(..), (|>)) import Data.Sequence qualified as Seq import Quasar.Awaitable @@ -52,6 +47,10 @@ import Quasar.Utils.Concurrent import Quasar.Utils.Exceptions +data DisposeException = DisposeException SomeException + deriving stock Show + deriving anyclass Exception + data FailedToRegisterResource = FailedToRegisterResource deriving stock (Eq, Show) @@ -59,65 +58,38 @@ instance Exception FailedToRegisterResource where displayException FailedToRegisterResource = "FailedToRegisterResource: Failed to register a resource to a resource manager. This might result in leaked resources if left unhandled." --- | Internal entry of `ResourceManager`. The `TMVar` will be set to `Nothing` when the disposable has completed disposing. -newtype ResourceManagerEntry = ResourceManagerEntry (TMVar (Awaitable (), Disposable)) - -instance IsAwaitable () ResourceManagerEntry where - toAwaitable (ResourceManagerEntry var) = do - awaitAny2 - do - -- Wait for empty TMVar (dispose completed by resource manager) - unsafeAwaitSTM do - check . isNothing =<< tryReadTMVar var - do - -- Wait for Awaitable (dispose completed externally) - varContents <- unsafeAwaitSTM $ tryReadTMVar var - case varContents of - Nothing -> pure () - Just (awaitable, _) -> awaitable - - -newEntry :: IsDisposable a => a -> IO ResourceManagerEntry -newEntry disposable = do - disposedAwaitable <- cacheAwaitable (isDisposed disposable) - ResourceManagerEntry <$> newTMVarIO (disposedAwaitable, toDisposable disposable) - -checkEntries :: Seq ResourceManagerEntry -> IO () -checkEntries = mapM_ checkEntry - -checkEntry :: ResourceManagerEntry -> IO () -checkEntry (ResourceManagerEntry var) = do - atomically (tryReadTMVar var) >>= \case - Nothing -> pure () - Just (awaitable, _) -> do - completed <- isJust <$> peekAwaitable awaitable - when completed $ atomically $ void $ tryTakeTMVar var - -entryIsEmpty :: ResourceManagerEntry -> STM Bool -entryIsEmpty (ResourceManagerEntry var) = isEmptyTMVar var +data FailedToLockResourceManager = FailedToLockResourceManager + deriving stock (Eq, Show) +instance Exception FailedToLockResourceManager where + displayException FailedToLockResourceManager = + "FailedToLockResourceManager: Failed to lock a resource manager." class IsDisposable a => IsResourceManager a where toResourceManager :: a -> ResourceManager toResourceManager = ResourceManager -- | Attaches an `Disposable` to a ResourceManager. It will automatically be disposed when the resource manager is disposed. + -- + -- May throw an `FailedToRegisterResource` if the resource manager is disposing/disposed. attachDisposable :: (IsDisposable b, MonadIO m) => a -> b -> m () attachDisposable self = attachDisposable (toResourceManager self) - --subResourceManager :: MonadResourceManager m => m (DisposableResourceThingy) + lockResourceManager :: (IsDisposable b, MonadIO m, MonadMask m) => a -> m b -> m b + lockResourceManager self = lockResourceManager (toResourceManager self) -- | Forward an exception that happened asynchronously. throwToResourceManager :: Exception e => a -> e -> IO () throwToResourceManager = throwToResourceManager . toResourceManager - {-# MINIMAL toResourceManager | (attachDisposable, throwToResourceManager) #-} + {-# MINIMAL toResourceManager | (attachDisposable, lockResourceManager, throwToResourceManager) #-} data ResourceManager = forall a. IsResourceManager a => ResourceManager a instance IsResourceManager ResourceManager where toResourceManager = id attachDisposable (ResourceManager x) = attachDisposable x + lockResourceManager (ResourceManager x) = lockResourceManager x throwToResourceManager (ResourceManager x) = throwToResourceManager x instance IsDisposable ResourceManager where toDisposable (ResourceManager x) = toDisposable x @@ -130,29 +102,31 @@ class (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager localResourceManager :: IsResourceManager a => a -> m r -> m r +-- | Register a `Disposable` to the resource manager. +-- +-- May throw an `FailedToRegisterResource` if the resource manager is disposing/disposed. registerDisposable :: (IsDisposable a, MonadResourceManager m) => a -> m () registerDisposable disposable = do resourceManager <- askResourceManager attachDisposable resourceManager disposable -registerDisposeAction :: MonadResourceManager m => IO (Awaitable ()) -> m () +registerDisposeAction :: MonadResourceManager m => IO () -> m () registerDisposeAction disposeAction = mask_ $ registerDisposable =<< newDisposable disposeAction -registerSimpleDisposeAction :: MonadResourceManager m => IO () -> m () -registerSimpleDisposeAction disposeAction = registerDisposeAction (pure () <$ disposeAction) - registerNewResource :: (IsDisposable a, MonadResourceManager m) => m a -> m a registerNewResource action = mask_ do - afix \awaitable -> do - registerDisposeAction $ either (\(_ :: SomeException) -> mempty) dispose =<< try (await awaitable) - action + resourceManager <- askResourceManager + lockResourceManager resourceManager do + resource <- action + attachDisposable resourceManager resource + pure resource --- TODO rename to withResourceScope? +-- TODO rename to withResourceScope, subResourceManager or withResourceManager? withSubResourceManagerM :: MonadResourceManager m => m a -> m a withSubResourceManagerM action = - bracket newResourceManager (await <=< dispose) \scope -> localResourceManager scope action + bracket newResourceManager dispose \scope -> localResourceManager scope action instance (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager (ReaderT ResourceManager m) where @@ -186,51 +160,6 @@ captureDisposable action = do captureDisposable_ :: MonadResourceManager m => m () -> m Disposable captureDisposable_ = snd <<$>> captureDisposable -captureTask :: MonadResourceManager m => m (Awaitable a) -> m (Task a) -captureTask action = do - (awaitable, disposable) <- captureDisposable action - pure $ Task disposable awaitable - - - --- | A computation bound to a resource manager with 'linkThread' should be canceled. -data CancelLinkedExecution = CancelLinkedExecution Unique - deriving anyclass Exception - -instance Show CancelLinkedExecution where - show _ = "CancelLinkedExecution" - - -data LinkState = LinkStateLinked ThreadId | LinkStateThrowing | LinkStateCompleted - deriving stock Eq - - --- | Links the execution of a computation to a resource manager. --- --- The computation is executed on the current thread. When the resource manager is disposed before the computation --- is completed, a `CancelLinkedExecution`-exception is thrown to the current thread. -linkExecution :: MonadResourceManager m => m a -> m (Maybe a) -linkExecution action = do - key <- liftIO $ newUnique - var <- liftIO $ newTVarIO =<< LinkStateLinked <$> myThreadId - registerSimpleDisposeAction $ do - atomically (swapTVar var LinkStateThrowing) >>= \case - LinkStateLinked threadId -> throwTo threadId $ CancelLinkedExecution key - LinkStateThrowing -> pure () -- Dispose called twice - LinkStateCompleted -> pure () -- Thread has already left link - - catch - do - result <- action - state <- liftIO $ atomically $ swapTVar var LinkStateCompleted - when (state == LinkStateThrowing) $ sleepForever -- Wait for exception to arrive - pure $ Just result - - \ex@(CancelLinkedExecution exceptionKey) -> - if key == exceptionKey - then return Nothing - else throwM ex - -- * Resource manager implementations @@ -238,215 +167,313 @@ linkExecution action = do -- ** Root resource manager data RootResourceManager - = RootResourceManager ResourceManager (TVar Bool) (TVar (Maybe (Seq SomeException))) (Awaitable ()) + = RootResourceManager DefaultResourceManager (TVar Bool) (TMVar (Seq SomeException)) (AsyncVar [SomeException]) instance IsResourceManager RootResourceManager where - attachDisposable (RootResourceManager child _ _ _) disposable = attachDisposable child disposable - throwToResourceManager (RootResourceManager _ disposingVar exceptionsVar _) ex = do + attachDisposable (RootResourceManager internal _ _ _) = attachDisposable internal + lockResourceManager (RootResourceManager internal _ _ _) = lockResourceManager internal + throwToResourceManager (RootResourceManager _ _ exceptionsVar _) ex = do -- TODO only log exceptions after a timeout traceIO $ "Exception thrown to root resource manager: " <> displayException ex liftIO $ join $ atomically do - stateTVar exceptionsVar \case - Just exceptions -> (pure (), Just (exceptions |> toException ex)) - Nothing -> (fail @IO "Could not throw to resource manager: RootResourceManager is already disposed", Nothing) + tryTakeTMVar exceptionsVar >>= \case + Just exceptions -> do + putTMVar exceptionsVar (exceptions |> toException ex) + pure $ pure @IO () + Nothing -> do + pure $ fail @IO "Could not throw to resource manager: RootResourceManager is already disposed" instance IsDisposable RootResourceManager where - dispose (RootResourceManager _ disposingVar _ isDisposedAwaitable) = liftIO do - isDisposedAwaitable <$ atomically do + beginDispose (RootResourceManager internal disposingVar _ _) = do + defaultResourceManagerDisposeResult internal <$ atomically do disposing <- readTVar disposingVar unless disposing $ writeTVar disposingVar True - isDisposed (RootResourceManager _ _ _ isDisposedAwaitable) = isDisposedAwaitable -newUnmanagedRootResourceManager :: MonadIO m => m ResourceManager -newUnmanagedRootResourceManager = liftIO $ toResourceManager <$> do + isDisposed (RootResourceManager internal _ _ _) = isDisposed internal + + registerFinalizer (RootResourceManager internal _ _ _) = registerFinalizer internal + +newUnmanagedRootResourceManagerInternal :: MonadIO m => m RootResourceManager +newUnmanagedRootResourceManagerInternal = liftIO do disposingVar <- newTVarIO False - exceptionsVar <- newTVarIO (Just Empty) + exceptionsVar <- newTMVarIO Empty + finalExceptionsVar <- newAsyncVar mfix \root -> do - isDisposedAwaitable <- toAwaitable <$> unmanagedFork (disposeThread root) - child <- newUnmanagedDefaultResourceManager (toResourceManager root) - pure $ RootResourceManager child disposingVar exceptionsVar isDisposedAwaitable + unmanagedFork_ (disposeThread root) + internal <- newUnmanagedDefaultResourceManagerInternal (toResourceManager root) + pure $ RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar where disposeThread :: RootResourceManager -> IO () - disposeThread (RootResourceManager child disposingVar exceptionsVar _) = do - -- Wait until disposing - atomically do - disposing <- readTVar disposingVar - hasExceptions <- (> 0) . Seq.length <$> (maybe impossibleCodePathM pure =<< readTVar exceptionsVar) - check $ disposing || hasExceptions + disposeThread (RootResourceManager internal disposingVar exceptionsVar finalExceptionsVar) = + handleAll + do \ex -> fail $ "RootResourceManager thread failed unexpectedly: " <> displayException ex + do + -- Wait until disposing + atomically do + disposing <- readTVar disposingVar + hasExceptions <- (> 0) . Seq.length <$> readTMVar exceptionsVar + check $ disposing || hasExceptions - -- TODO start the thread that reports exceptions (or a potential hang) after a timeout + -- TODO start the thread that reports exceptions (or a potential hang) after a timeout - await =<< dispose child + dispose internal - exceptions <- atomically do - -- The var is set to `Nothing` to signal that no more exceptions can be received - maybe impossibleCodePathM pure =<< swapTVar exceptionsVar Nothing + atomically do + -- The var is set to `Nothing` to signal that no more exceptions can be received + exceptions <- takeTMVar exceptionsVar - -- If there are any exceptions will be stored in the awaitable (isDisposedAwaitable) by throwing them here - mapM_ (throwM . CombinedException) $ nonEmpty $ toList exceptions + putAsyncVarSTM_ finalExceptionsVar $ toList exceptions withRootResourceManager :: (MonadAwait m, MonadMask m, MonadIO m) => ReaderT ResourceManager IO a -> m a -withRootResourceManager action = - bracket - newUnmanagedRootResourceManager - (await <=< dispose) - (`onResourceManager` action) +withRootResourceManager action = uninterruptibleMask \unmask -> do + resourceManager@(RootResourceManager _ _ _ finalExceptionsVar) <- newUnmanagedRootResourceManagerInternal + + result <- try $ unmask $ onResourceManager resourceManager action + + disposeEventually_ resourceManager + exceptions <- await finalExceptionsVar + + case result of + Left (ex :: SomeException) -> maybe (throwM ex) (throwM . CombinedException . (ex <|)) (nonEmpty exceptions) + Right result' -> maybe (pure result') (throwM . CombinedException) $ nonEmpty exceptions -- ** Default resource manager data DefaultResourceManager = DefaultResourceManager { - parentResourceManager :: ResourceManager, - disposingVar :: TVar Bool, - disposedVar :: TVar Bool, - entriesVar :: TVar (Seq ResourceManagerEntry) + resourceManagerKey :: Unique, + throwToHandler :: SomeException -> IO (), + stateVar :: TVar ResourceManagerState, + disposablesVar :: TMVar (HashMap Unique Disposable), + lockVar :: TVar Word64, + resultVar :: AsyncVar (Awaitable [ResourceManagerResult]), + finalizers :: DisposableFinalizers } -instance IsResourceManager DefaultResourceManager where - throwToResourceManager DefaultResourceManager{parentResourceManager} = throwToResourceManager parentResourceManager +data ResourceManagerState + = ResourceManagerNormal + | ResourceManagerDisposing + | ResourceManagerDisposed - attachDisposable resourceManager disposable = liftIO $ mask_ do - entry <- newEntry disposable +instance IsResourceManager DefaultResourceManager where + throwToResourceManager DefaultResourceManager{throwToHandler} = throwToHandler . toException + attachDisposable DefaultResourceManager{stateVar, disposablesVar} disposable = liftIO $ mask_ do + key <- newUnique join $ atomically do - disposing <- readTVar (disposingVar resourceManager) - - unless disposing $ modifyTVar (entriesVar resourceManager) (|> entry) + state <- readTVar stateVar + case state of + ResourceManagerNormal -> do + disposables <- takeTMVar disposablesVar + putTMVar disposablesVar (HM.insert key (toDisposable disposable) disposables) + registerFinalizer disposable (finalizer key) + pure $ pure @IO () + _ -> pure $ throwM @IO FailedToRegisterResource + where + finalizer :: Unique -> STM () + finalizer key = + tryTakeTMVar disposablesVar >>= \case + Just disposables -> + putTMVar disposablesVar $ HM.delete key disposables + Nothing -> pure () - pure do - -- IO that is run after the STM transaction is completed - when disposing $ - throwM FailedToRegisterResource `catchAll` throwToResourceManager resourceManager + lockResourceManager DefaultResourceManager{stateVar, lockVar} = + bracket_ (liftIO aquire) (liftIO release) + where + aquire :: IO () + aquire = atomically do + readTVar stateVar >>= \case + ResourceManagerNormal -> pure () + _ -> throwM FailedToLockResourceManager + modifyTVar lockVar (+ 1) + release :: IO () + release = atomically (modifyTVar lockVar (\x -> x - 1)) instance IsDisposable DefaultResourceManager where - dispose resourceManager = liftIO $ mask_ do - entries <- atomically do - isAlreadyDisposing <- swapTVar (disposingVar resourceManager) True - if not isAlreadyDisposing - then readTVar (entriesVar resourceManager) - else pure Empty - - mapM_ entryStartDispose entries - pure $ isDisposed resourceManager + beginDispose self@DefaultResourceManager{resourceManagerKey, stateVar, disposablesVar, lockVar, resultVar, finalizers} = liftIO do + uninterruptibleMask_ do + join $ atomically do + state <- readTVar stateVar + case state of + ResourceManagerNormal -> do + writeTVar stateVar $ ResourceManagerDisposing + readTVar lockVar >>= \case + 0 -> do + disposables <- takeDisposables + pure (primaryBeginDispose disposables) + _ -> pure primaryForkDisposeThread + ResourceManagerDisposing -> pure $ pure $ defaultResourceManagerDisposeResult self + ResourceManagerDisposed -> pure $ pure DisposeResultDisposed where - entryStartDispose :: ResourceManagerEntry -> IO () - entryStartDispose (ResourceManagerEntry var) = - atomically (tryReadTMVar var) >>= \case - Nothing -> pure () - Just (_, disposable) -> - catchAll - do void (dispose disposable) - \ex -> do - -- Disposable failed so it should be removed - atomically (void $ tryTakeTMVar var) - -- This will only throw if the parent is disposed, which would be an illegal state - -- TODO wrap in a 'DisposeException' - throwToResourceManager resourceManager ex - - - isDisposed resourceManager = + primaryForkDisposeThread :: IO DisposeResult + primaryForkDisposeThread = forkDisposeThread do + disposables <- atomically do + check =<< (== 0) <$> readTVar lockVar + takeDisposables + void $ primaryBeginDispose disposables + + -- Only one thread enters this function (in uninterruptible masked state) + primaryBeginDispose :: [Disposable] -> IO DisposeResult + primaryBeginDispose disposables = do + (reportExceptionActions, resultAwaitables) <- unzip <$> mapM beginDisposeEntry disposables + cachedResultAwaitable <- cacheAwaitable $ mconcat resultAwaitables + putAsyncVar_ resultVar cachedResultAwaitable + + let + isCompletedAwaitable :: Awaitable () + isCompletedAwaitable = awaitResourceManagerResult $ ResourceManagerResult resourceManagerKey cachedResultAwaitable + + alreadyCompleted <- isJust <$> peekAwaitable isCompletedAwaitable + if alreadyCompleted + then do + completeDisposing + pure DisposeResultDisposed + else do + -- Start thread to collect exceptions, await completion and run finalizers + forkDisposeThread do + -- Collect exceptions from directly attached disposables + sequence_ reportExceptionActions + -- Await completion attached resource managers + await isCompletedAwaitable + + completeDisposing + + forkDisposeThread :: IO () -> IO DisposeResult + forkDisposeThread action = do + defaultResourceManagerDisposeResult self <$ forkIO do + catchAll + action + \ex -> throwToResourceManager self (userError ("Dispose thread failed for DefaultResourceManager: " <> displayException ex)) + + takeDisposables :: STM [Disposable] + takeDisposables = toList <$> takeTMVar disposablesVar + + beginDisposeEntry :: Disposable -> IO (IO (), (Awaitable [ResourceManagerResult])) + beginDisposeEntry disposable = + catchAll + do + result <- beginDispose disposable + pure case result of + DisposeResultDisposed -> (pure (), pure []) + -- Moves error reporting from the awaitable to the finalizer thread + DisposeResultAwait awaitable -> (processDisposeException awaitable, [] <$ awaitSuccessOrFailure awaitable) + DisposeResultResourceManager resourceManagerResult -> (pure (), pure [resourceManagerResult]) + \ex -> do + throwToResourceManager self $ DisposeException ex + pure (pure (), pure []) + + processDisposeException :: Awaitable () -> IO () + processDisposeException awaitable = + await awaitable + `catchAll` + \ex -> throwToResourceManager self $ DisposeException ex + + completeDisposing :: IO () + completeDisposing = + atomically do + writeTVar stateVar $ ResourceManagerDisposed + defaultRunFinalizers finalizers + + isDisposed DefaultResourceManager{stateVar} = unsafeAwaitSTM do - disposed <- readTVar (disposedVar resourceManager) - unless disposed retry + disposed <- stateIsDisposed <$> readTVar stateVar + check disposed + where + stateIsDisposed :: ResourceManagerState -> Bool + stateIsDisposed ResourceManagerDisposed = True + stateIsDisposed _ = False + + registerFinalizer DefaultResourceManager{finalizers} = defaultRegisterFinalizer finalizers + +defaultResourceManagerDisposeResult :: DefaultResourceManager -> DisposeResult +defaultResourceManagerDisposeResult DefaultResourceManager{resourceManagerKey, resultVar} = + DisposeResultResourceManager $ ResourceManagerResult resourceManagerKey $ join $ toAwaitable resultVar + +newUnmanagedDefaultResourceManager :: MonadIO m => ResourceManager -> m ResourceManager +newUnmanagedDefaultResourceManager parentResourceManager = liftIO do + toResourceManager <$> newUnmanagedDefaultResourceManagerInternal parentResourceManager + +newUnmanagedDefaultResourceManagerInternal :: MonadIO m => ResourceManager -> m DefaultResourceManager +newUnmanagedDefaultResourceManagerInternal parentResourceManager = liftIO do + resourceManagerKey <- newUnique + stateVar <- newTVarIO ResourceManagerNormal + disposablesVar <- newTMVarIO HM.empty + lockVar <- newTVarIO 0 + finalizers <- newDisposableFinalizers + resultVar <- newAsyncVar + + pure DefaultResourceManager { + resourceManagerKey, + throwToHandler = throwToResourceManager parentResourceManager, + stateVar, + disposablesVar, + lockVar, + finalizers, + resultVar + } newResourceManager :: MonadResourceManager m => m ResourceManager newResourceManager = mask_ do parent <- askResourceManager - -- TODO: return efficent resource manager resourceManager <- newUnmanagedDefaultResourceManager parent registerDisposable resourceManager pure resourceManager -newUnmanagedDefaultResourceManager :: MonadIO m => ResourceManager -> m ResourceManager -newUnmanagedDefaultResourceManager parentResourceManager = liftIO do - disposingVar <- newTVarIO False - disposedVar <- newTVarIO False - entriesVar <- newTVarIO Empty - - let resourceManager = DefaultResourceManager { - parentResourceManager, - disposingVar, - disposedVar, - entriesVar - } - - void $ mask_ $ forkIOWithUnmask \unmask -> - unmask (freeGarbage resourceManager) - `catchAll` - \ex -> throwToResourceManager resourceManager (userError ("freeGarbage failed for DefaultResourceManager: " <> displayException ex)) - - pure $ toResourceManager resourceManager +-- * Utilities -freeGarbage :: DefaultResourceManager -> IO () -freeGarbage resourceManager = go - where - go :: IO () - go = do - snapshot <- atomically $ readTVar entriesVar' - - let listChanged = unsafeAwaitSTM do - newLength <- Seq.length <$> readTVar entriesVar' - when (newLength == Seq.length snapshot) retry +-- | Creates an `Disposable` that is bound to a ResourceManager. It will automatically be disposed when the resource manager is disposed. +attachDisposeAction :: MonadIO m => ResourceManager -> IO () -> m Disposable +attachDisposeAction resourceManager action = liftIO $ mask_ $ do + disposable <- newDisposable action + attachDisposable resourceManager disposable + pure disposable - isDisposing = unsafeAwaitSTM do - disposing <- readTVar (disposingVar resourceManager) - unless disposing retry +-- | Attaches a dispose action to a ResourceManager. It will automatically be run when the resource manager is disposed. +attachDisposeAction_ :: MonadIO m => ResourceManager -> IO () -> m () +attachDisposeAction_ resourceManager action = void $ attachDisposeAction resourceManager action - -- Wait for any entry to complete or until a new entry is added - let awaitables = (toAwaitable <$> toList snapshot) - -- GC fails here when an waitable throws an exception - void if Quasar.Prelude.null awaitables - then awaitAny2 listChanged isDisposing - else awaitAny (listChanged :| awaitables) - -- Checking entries for completion has to be done in IO. - -- Completion is queried with `entryIsEmpty` during the following STM transaction for legacy reasons (the resource - -- manager once did allow to add resources while disposing). This could be simplified now. - checkEntries =<< atomically (readTVar entriesVar') +-- ** Link execution to resource manager - join $ atomically $ do - disposing <- readTVar (disposingVar resourceManager) +-- | A computation bound to a resource manager with 'linkThread' should be canceled. +data CancelLinkedExecution = CancelLinkedExecution Unique + deriving anyclass Exception - -- Filter completed entries - allEntries <- readTVar entriesVar' - filteredEntries <- foldM (\acc entry -> entryIsEmpty entry >>= \isEmpty -> pure if isEmpty then acc else acc |> entry) Empty allEntries - writeTVar entriesVar' filteredEntries +instance Show CancelLinkedExecution where + show _ = "CancelLinkedExecution" - if disposing && Seq.null filteredEntries - then do - writeTVar (disposedVar resourceManager) True - pure $ pure () - else pure go - entriesVar' :: TVar (Seq ResourceManagerEntry) - entriesVar' = entriesVar resourceManager +data LinkState = LinkStateLinked ThreadId | LinkStateThrowing | LinkStateCompleted + deriving stock Eq --- * Utilities +-- | Links the execution of a computation to a resource manager. +-- +-- The computation is executed on the current thread. When the resource manager is disposed before the computation +-- is completed, a `CancelLinkedExecution`-exception is thrown to the current thread. +linkExecution :: MonadResourceManager m => m a -> m (Maybe a) +linkExecution action = do + key <- liftIO $ newUnique + var <- liftIO $ newTVarIO =<< LinkStateLinked <$> myThreadId + registerDisposeAction $ do + atomically (swapTVar var LinkStateThrowing) >>= \case + LinkStateLinked threadId -> throwTo threadId $ CancelLinkedExecution key + LinkStateThrowing -> pure () -- Dispose called twice + LinkStateCompleted -> pure () -- Thread has already left link --- | Creates an `Disposable` that is bound to a ResourceManager. It will automatically be disposed when the resource manager is disposed. -attachDisposeAction :: MonadIO m => ResourceManager -> IO (Awaitable ()) -> m Disposable -attachDisposeAction resourceManager action = liftIO $ mask_ $ do - disposable <- newDisposable action - attachDisposable resourceManager disposable - pure disposable + catch + do + result <- action + state <- liftIO $ atomically $ swapTVar var LinkStateCompleted + when (state == LinkStateThrowing) $ sleepForever -- Wait for exception to arrive + pure $ Just result --- | Attaches a dispose action to a ResourceManager. It will automatically be run when the resource manager is disposed. -attachDisposeAction_ :: MonadIO m => ResourceManager -> IO (Awaitable ()) -> m () -attachDisposeAction_ resourceManager action = void $ attachDisposeAction resourceManager action + \ex@(CancelLinkedExecution exceptionKey) -> + if key == exceptionKey + then return Nothing + else throwM ex --- | Start disposing a resource but instead of waiting for the operation to complete, pass the responsibility to a --- `MonadResourceManager`. --- --- The synchronous part of the `dispose`-Function will be run immediately but the resulting `Awaitable` will be passed --- to the resource manager. -disposeEventually :: (IsDisposable a, MonadResourceManager m) => a -> m () -disposeEventually disposable = do - disposeCompleted <- dispose disposable - peekAwaitable disposeCompleted >>= \case - Just () -> pure () - Nothing -> registerDisposable disposable diff --git a/src/Quasar/Subscribable.hs b/src/Quasar/Subscribable.hs index ba0a651d3297cd79b66e12f1f3d1771cd6f006de..e053481c83077e92eed6c20d817d52180ed1eaa1 100644 --- a/src/Quasar/Subscribable.hs +++ b/src/Quasar/Subscribable.hs @@ -74,7 +74,7 @@ instance IsSubscribable r (SubscribableEvent r) where liftIO $ atomically do callbackMap <- readTVar tvar writeTVar tvar $ HM.insert key (\msg -> runReaderT (callback msg) resourceManager) callbackMap - registerDisposable =<< synchronousDisposable (disposeFn key) + registerDisposable =<< newDisposable (disposeFn key) where disposeFn :: Unique -> IO () disposeFn key = atomically do diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs index e43934a01274c6dd58d8612d61e9aa1fa869e739..8bf24f31d12b33130f6a8989f58ac1f2a57450ad 100644 --- a/src/Quasar/Timer.hs +++ b/src/Quasar/Timer.hs @@ -47,16 +47,20 @@ instance Ord Timer where x `compare` y = time x `compare` time y instance IsDisposable Timer where - dispose self = liftIO do - atomically do - cancelled <- failAsyncVarSTM (completed self) TimerCancelled - when cancelled do - modifyTVar (activeCount (scheduler self)) (+ (-1)) - modifyTVar (cancelledCount (scheduler self)) (+ 1) - pure $ isDisposed self + beginDispose = undefined + + --dispose self = liftIO do + -- atomically do + -- cancelled <- failAsyncVarSTM (completed self) TimerCancelled + -- when cancelled do + -- modifyTVar (activeCount (scheduler self)) (+ (-1)) + -- modifyTVar (cancelledCount (scheduler self)) (+ 1) + -- pure $ isDisposed self isDisposed = awaitSuccessOrFailure . completed + registerFinalizer = undefined + instance IsAwaitable () Timer where toAwaitable = toAwaitable . completed @@ -177,7 +181,7 @@ newTimer scheduler time = do sleepUntil :: TimerScheduler -> UTCTime -> IO () -sleepUntil scheduler time = bracketOnError (newTimer scheduler time) disposeAndAwait await +sleepUntil scheduler time = bracketOnError (newTimer scheduler time) dispose await diff --git a/src/Quasar/Utils/Concurrent.hs b/src/Quasar/Utils/Concurrent.hs index 1d143fdafe6f92da5924a5694f96205596b628cf..fe36c38db661c2593ba01a35f64bc532537e3e49 100644 --- a/src/Quasar/Utils/Concurrent.hs +++ b/src/Quasar/Utils/Concurrent.hs @@ -1,8 +1,13 @@ module Quasar.Utils.Concurrent ( + Task, unmanagedFork, unmanagedFork_, unmanagedForkWithUnmask, unmanagedForkWithUnmask_, + + -- ** Task exceptions + CancelTask(..), + TaskDisposed(..), )where @@ -14,6 +19,57 @@ import Quasar.Disposable import Quasar.Prelude + + +-- | A task is an operation (e.g. a thread or a network request) that is running asynchronously and can be cancelled. +-- It has a result and can fail. +-- +-- The result (or exception) can be aquired by using the `IsAwaitable` class (e.g. by calling `await` or `awaitIO`). +-- It is possible to cancel the task by using `dispose` or `cancelTask` if the operation has not been completed. +data Task r = Task Unique (TVar TaskState) DisposableFinalizers (Awaitable r) + +data TaskState = TaskStateInitializing | TaskStateRunning ThreadId | TaskStateThrowing | TaskStateCompleted + +instance IsAwaitable r (Task r) where + toAwaitable (Task _ _ _ resultAwaitable) = resultAwaitable + +instance IsDisposable (Task r) where + beginDispose self@(Task key stateVar _ _) = uninterruptibleMask_ do + join $ atomically do + readTVar stateVar >>= \case + TaskStateInitializing -> impossibleCodePathM + TaskStateRunning threadId -> do + writeTVar stateVar TaskStateThrowing + pure do + throwTo threadId $ CancelTask key + atomically $ writeTVar stateVar TaskStateCompleted + TaskStateThrowing -> pure $ pure () + TaskStateCompleted -> pure $ pure () + + -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang. + pure $ DisposeResultAwait $ isDisposed self + + isDisposed (Task _ _ _ resultAwaitable) = (() <$ resultAwaitable) `catchAll` \_ -> pure () + + registerFinalizer (Task _ _ finalizers _) = defaultRegisterFinalizer finalizers + +instance Functor Task where + fmap fn (Task key actionVar finalizerVar resultAwaitable) = Task key actionVar finalizerVar (fn <$> resultAwaitable) + + +data CancelTask = CancelTask Unique +instance Show CancelTask where + show _ = "CancelTask" +instance Exception CancelTask where + +data TaskDisposed = TaskDisposed + deriving stock Show +instance Exception TaskDisposed where + + + + + unmanagedFork :: MonadIO m => IO a -> m (Task a) unmanagedFork action = unmanagedForkWithUnmask \unmask -> unmask action @@ -23,42 +79,41 @@ unmanagedFork_ action = toDisposable <$> unmanagedFork action unmanagedForkWithUnmask :: MonadIO m => ((forall b. IO b -> IO b) -> IO a) -> m (Task a) unmanagedForkWithUnmask action = do liftIO $ mask_ do + key <- newUnique resultVar <- newAsyncVar - threadIdVar <- newEmptyTMVarIO - - disposable <- newDisposable $ disposeTask threadIdVar resultVar - - onException - do - atomically . putTMVar threadIdVar . Just =<< - forkIOWithUnmask \unmask -> do - result <- try $ catch - do action unmask - \CancelTask -> throwIO TaskDisposed - - putAsyncVarEither_ resultVar result - - -- The `action` has completed its work. - -- "disarm" the disposer thread ... - void $ atomically $ swapTMVar threadIdVar Nothing - -- .. then fire the disposable to release resources (the disposer thread) and to signal that this thread is - -- disposed. - await =<< dispose disposable - - do atomically $ putTMVar threadIdVar Nothing - - pure $ Task disposable (toAwaitable resultVar) - where - disposeTask :: TMVar (Maybe ThreadId) -> AsyncVar r -> IO (Awaitable ()) - disposeTask threadIdVar resultVar = mask_ do - -- Blocks until the thread is forked - atomically (swapTMVar threadIdVar Nothing) >>= \case - -- Thread completed or initialization failed - Nothing -> pure () - Just threadId -> throwTo threadId CancelTask - - -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang. - pure $ void (toAwaitable resultVar) `catchAll` const (pure ()) + stateVar <- newTVarIO TaskStateInitializing + finalizers <- newDisposableFinalizers + + threadId <- forkIOWithUnmask \unmask -> + handleAll + do \ex -> fail $ "unmanagedForkWithUnmask thread failed: " <> displayException ex + do + result <- try $ handleIf + do \(CancelTask exKey) -> key == exKey + do \_ -> throwIO TaskDisposed + do + action unmask + + -- The `action` has completed its work. + -- "disarm" dispose: + handleIf + do \(CancelTask exKey) -> key == exKey + do mempty -- ignore exception if it matches; this can only happen once + do + atomically $ readTVar stateVar >>= \case + TaskStateInitializing -> retry + TaskStateRunning _ -> writeTVar stateVar TaskStateCompleted + TaskStateThrowing -> retry -- Could not disarm so we have to wait for the exception to arrive + TaskStateCompleted -> pure () + + atomically do + putAsyncVarEitherSTM_ resultVar result + defaultRunFinalizers finalizers + + + atomically $ writeTVar stateVar $ TaskStateRunning threadId + + pure $ Task key stateVar finalizers (toAwaitable resultVar) unmanagedForkWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable unmanagedForkWithUnmask_ action = toDisposable <$> unmanagedForkWithUnmask action diff --git a/src/Quasar/Utils/Exceptions.hs b/src/Quasar/Utils/Exceptions.hs index 66cbdce935c15af6da053ff861f75d933e5b530e..2bede2da24f349df4f92b1de59b2e8eb33dfe93e 100644 --- a/src/Quasar/Utils/Exceptions.hs +++ b/src/Quasar/Utils/Exceptions.hs @@ -5,7 +5,7 @@ module Quasar.Utils.Exceptions ( import Control.Exception import Data.Foldable (toList) -import Data.List.NonEmpty (NonEmpty, nonEmpty) +import Data.List.NonEmpty (NonEmpty) import Quasar.Prelude newtype CombinedException = CombinedException (NonEmpty SomeException) diff --git a/test/Quasar/DisposableSpec.hs b/test/Quasar/DisposableSpec.hs index c1f46f3e1d45213950e74feacf8f629b17ecc88d..90308ad5831cf76bd1818d94c96d693cde69f502 100644 --- a/test/Quasar/DisposableSpec.hs +++ b/test/Quasar/DisposableSpec.hs @@ -11,25 +11,25 @@ spec = parallel $ do describe "Disposable" $ do describe "noDisposable" $ do it "can be disposed" $ io do - await =<< dispose noDisposable + dispose noDisposable it "can be awaited" $ io do await (isDisposed noDisposable) describe "newDisposable" $ do it "signals it's disposed state" $ io do - disposable <- newDisposable $ pure $ pure () - void $ forkIO $ threadDelay 100000 >> disposeAndAwait disposable + disposable <- newDisposable $ pure () + void $ forkIO $ threadDelay 100000 >> dispose disposable await (isDisposed disposable) it "can be disposed multiple times" $ io do - disposable <- newDisposable $ pure $ pure () - disposeAndAwait disposable - disposeAndAwait disposable + disposable <- newDisposable $ pure () + dispose disposable + dispose disposable await (isDisposed disposable) it "can be disposed in parallel" $ do - disposable <- newDisposable $ pure () <$ threadDelay 100000 - void $ forkIO $ disposeAndAwait disposable - disposeAndAwait disposable + disposable <- newDisposable $ threadDelay 100000 + void $ forkIO $ dispose disposable + dispose disposable await (isDisposed disposable) diff --git a/test/Quasar/Observable/ObservableHashMapSpec.hs b/test/Quasar/Observable/ObservableHashMapSpec.hs index 5b658bcf676ab9238cf45afd309ede4593132d25..d18fde08931a709ca708af806df9a2ec2541dcbd 100644 --- a/test/Quasar/Observable/ObservableHashMapSpec.hs +++ b/test/Quasar/Observable/ObservableHashMapSpec.hs @@ -39,7 +39,7 @@ spec = parallel $ do OM.insert "key2" "value2" om lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) - disposeAndAwait subscriptionHandle + dispose subscriptionHandle lastCallbackShouldBe (HM.fromList [("key", "value"), ("key2", "value2")]) OM.insert "key3" "value3" om @@ -61,7 +61,7 @@ spec = parallel $ do OM.insert "key2" "value2" om lastDeltaShouldBe $ Insert "key2" "value2" - disposeAndAwait subscriptionHandle + dispose subscriptionHandle lastDeltaShouldBe $ Insert "key2" "value2" OM.insert "key3" "value3" om @@ -120,7 +120,7 @@ spec = parallel $ do v2ShouldBe $ Just "changed" retrieveIO om `shouldReturn` HM.singleton "key2" "changed" - disposeAndAwait handle2 + dispose handle2 OM.lookupDelete "key2" om `shouldReturn` Just "changed" v2ShouldBe $ Just "changed" diff --git a/test/Quasar/Observable/ObservablePrioritySpec.hs b/test/Quasar/Observable/ObservablePrioritySpec.hs index 45beb23917e64278aaecc2b8f33e41fd4a09e3b0..39bf3c031431e86d29fd53a621513e7c7a2831d4 100644 --- a/test/Quasar/Observable/ObservablePrioritySpec.hs +++ b/test/Quasar/Observable/ObservablePrioritySpec.hs @@ -22,9 +22,9 @@ spec = do retrieveIO op `shouldReturn` Just "p2" p1 <- OP.insertValue op 1 "p1" retrieveIO op `shouldReturn` Just "p2" - disposeAndAwait p2 + dispose p2 retrieveIO op `shouldReturn` Just "p1" - disposeAndAwait p1 + dispose p1 retrieveIO op `shouldReturn` Nothing it "sends updates when its value changes" $ do result <- newIORef [] @@ -40,9 +40,9 @@ spec = do mostRecentShouldBe (Just "p2") p1 <- OP.insertValue op 1 "p1" mostRecentShouldBe (Just "p2") - disposeAndAwait p2 + dispose p2 mostRecentShouldBe (Just "p1") - disposeAndAwait p1 + dispose p1 mostRecentShouldBe Nothing length <$> readIORef result `shouldReturn` 4 diff --git a/test/Quasar/ResourceManagerSpec.hs b/test/Quasar/ResourceManagerSpec.hs index e3b9e3b92fff341cc049325018ee7bb83949e5b0..9c7464a1d749ee50bbf2da6e228ec473e3d52c95 100644 --- a/test/Quasar/ResourceManagerSpec.hs +++ b/test/Quasar/ResourceManagerSpec.hs @@ -19,14 +19,17 @@ spec :: Spec spec = parallel $ do describe "ResourceManager" $ do it "can be created" $ io do - void newUnmanagedRootResourceManager + withRootResourceManager $ pure () it "can be created and disposed" $ io do - resourceManager <- newUnmanagedRootResourceManager - await =<< dispose resourceManager + withRootResourceManager do + resourceManager <- askResourceManager + disposeEventually_ resourceManager - it "can be created and disposed" $ io do - withRootResourceManager $ pure () + it "is disposed when exiting withRootResourceManager" $ io do + resourceManager <- withRootResourceManager askResourceManager + + peekAwaitable (isDisposed resourceManager) `shouldReturn` Just () it "can be created and disposed with a delay" $ io do withRootResourceManager $ liftIO $ threadDelay 100000 @@ -35,29 +38,16 @@ spec = parallel $ do withRootResourceManager do registerDisposable noDisposable - it "can attach an disposable" $ io do + it "can attach a dispose action" $ io do + var <- newTVarIO False withRootResourceManager do - avar <- newAsyncVar - registerDisposable $ alreadyDisposing avar - putAsyncVar_ avar () + registerDisposeAction $ atomically $ writeTVar var True - it "can dispose an awaitable that is completed asynchronously" $ io do - avar <- newAsyncVar - void $ forkIO $ do - threadDelay 100000 - putAsyncVar_ avar () - - withRootResourceManager do - registerDisposable (alreadyDisposing avar) - - it "can call a trivial dispose action" $ io do - withRootResourceManager do - registerDisposeAction $ pure $ pure () + atomically (readTVar var) `shouldReturn` True - it "can call a dispose action" $ io do + it "can attach a slow dispose action" $ io do withRootResourceManager do - avar <- newAsyncVar - registerDisposeAction $ toAwaitable avar <$ putAsyncVar_ avar () + registerDisposeAction $ threadDelay 100000 it "re-throws an exception" $ do shouldThrow @@ -88,17 +78,17 @@ spec = parallel $ do it "can attach an disposable that is disposed asynchronously" $ io do withRootResourceManager do - disposable <- captureDisposable_ $ registerDisposeAction $ pure () <$ threadDelay 100000 - liftIO $ void $ forkIO $ await =<< dispose disposable + disposable <- captureDisposable_ $ registerDisposeAction $ threadDelay 100000 + liftIO $ void $ forkIO $ dispose disposable it "does not abort disposing when encountering an exception" $ do var1 <- newTVarIO False var2 <- newTVarIO False (`shouldThrow` \(_ :: CombinedException) -> True) do withRootResourceManager do - registerDisposeAction $ pure () <$ (atomically (writeTVar var1 True)) - registerDisposeAction $ pure () <$ throwIO TestException - registerDisposeAction $ pure () <$ (atomically (writeTVar var2 True)) + registerDisposeAction $ atomically (writeTVar var1 True) + registerDisposeAction $ throwIO TestException + registerDisposeAction $ atomically (writeTVar var2 True) atomically (readTVar var1) `shouldReturn` True atomically (readTVar var2) `shouldReturn` True @@ -111,13 +101,41 @@ spec = parallel $ do sleepForever it "combines exceptions from resources with exceptions on the thread" $ io do - pendingWith "not implemented" (`shouldThrow` \(combinedExceptions -> exceptions) -> length exceptions == 2) do withRootResourceManager do rm <- askResourceManager liftIO $ throwToResourceManager rm TestException throwM TestException + it "can dispose a resource manager loop" $ io do + withRootResourceManager do + rm1 <- newResourceManager + rm2 <- newResourceManager + attachDisposable rm1 rm2 + attachDisposable rm2 rm1 + + it "can dispose a resource manager loop" $ io do + withRootResourceManager do + rm1 <- newResourceManager + rm2 <- newResourceManager + attachDisposable rm1 rm2 + attachDisposable rm2 rm1 + dispose rm1 + + it "can dispose a resource manager loop with a shared disposable" $ io do + var <- newTVarIO (0 :: Int) + d <- newDisposable $ atomically $ modifyTVar var (+ 1) + withRootResourceManager do + rm1 <- newResourceManager + rm2 <- newResourceManager + attachDisposable rm1 rm2 + attachDisposable rm2 rm1 + attachDisposable rm1 d + attachDisposable rm2 d + + atomically (readTVar var) `shouldReturn` 1 + + describe "linkExecution" do it "does not generate an exception after it is completed" $ io do (`shouldThrow` \(_ :: CombinedException) -> True) do