Skip to content
Snippets Groups Projects
Commit 8c015531 authored by Jens Nolte's avatar Jens Nolte
Browse files

Add transactional disposer which can be disposed STM


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 2384b110
No related branches found
No related tags found
No related merge requests found
......@@ -30,6 +30,8 @@ module Quasar.Resources (
-- * Types to implement resources
-- ** Disposer
Disposer,
TDisposer,
disposeTDisposer,
newUnmanagedIODisposer,
newUnmanagedSTMDisposer,
trivialDisposer,
......@@ -79,7 +81,7 @@ registerDisposeActionIO fn = quasarAtomically $ registerDisposeAction fn
registerDisposeActionIO_ :: (MonadQuasar m, MonadIO m) => IO () -> m ()
registerDisposeActionIO_ fn = quasarAtomically $ void $ registerDisposeAction fn
registerDisposeTransaction :: (MonadQuasar m, MonadSTM m) => STM () -> m Disposer
registerDisposeTransaction :: (MonadQuasar m, MonadSTM m) => STM () -> m TDisposer
registerDisposeTransaction fn = do
worker <- askIOWorker
exChan <- askExceptionSink
......@@ -88,12 +90,12 @@ registerDisposeTransaction fn = do
disposer <- newUnmanagedSTMDisposer fn worker exChan
attachResource rm disposer
pure disposer
{-# SPECIALIZE registerDisposeTransaction :: STM () -> QuasarSTM Disposer #-}
{-# SPECIALIZE registerDisposeTransaction :: STM () -> QuasarSTM TDisposer #-}
registerDisposeTransaction_ :: (MonadQuasar m, MonadSTM m) => STM () -> m ()
registerDisposeTransaction_ fn = liftQuasarSTM $ void $ registerDisposeTransaction fn
registerDisposeTransactionIO :: (MonadQuasar m, MonadIO m) => STM () -> m Disposer
registerDisposeTransactionIO :: (MonadQuasar m, MonadIO m) => STM () -> m TDisposer
registerDisposeTransactionIO fn = quasarAtomically $ registerDisposeTransaction fn
registerDisposeTransactionIO_ :: (MonadQuasar m, MonadIO m) => STM () -> m ()
......
......@@ -11,6 +11,9 @@ module Quasar.Resources.Disposer (
newUnmanagedSTMDisposer,
trivialDisposer,
TDisposer,
disposeTDisposer,
-- * Resource manager
ResourceManager,
newUnmanagedResourceManagerSTM,
......@@ -20,7 +23,6 @@ module Quasar.Resources.Disposer (
import Control.Monad (foldM)
import Control.Monad.Catch
import Data.Either (isRight)
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM
import Data.HashSet (HashSet)
......@@ -53,26 +55,100 @@ instance Resource Disposer where
isDisposed (Disposer ds) = foldMap isDisposed ds
isDisposing (Disposer ds) = awaitAny $ isDisposing <$> ds
newtype TDisposer = TDisposer [TDisposerElement]
deriving newtype (Semigroup, Monoid)
instance Resource TDisposer where
toDisposer (TDisposer ds) = toDisposer ds
type DisposerState = TOnce DisposeFn (Future ())
data DisposerElement
= FnDisposer Unique TIOWorker ExceptionSink DisposerState Finalizers
= IODisposer Unique TIOWorker ExceptionSink DisposerState Finalizers
| STMDisposer TDisposerElement
| ResourceManagerDisposer ResourceManager
instance Resource DisposerElement where
toDisposer disposer = Disposer [disposer]
isDisposed (FnDisposer _ _ _ state _) = join (toFuture state)
isDisposed (IODisposer _ _ _ state _) = join (toFuture state)
isDisposed (STMDisposer tdisposer) = isDisposed tdisposer
isDisposed (ResourceManagerDisposer resourceManager) = resourceManagerIsDisposed resourceManager
isDisposing (FnDisposer _ _ _ state _) = unsafeAwaitSTM (check . isRight =<< readTOnceState state)
isDisposing (IODisposer _ _ _ state _) = void (toFuture state)
isDisposing (STMDisposer tdisposer) = isDisposing tdisposer
isDisposing (ResourceManagerDisposer resourceManager) = resourceManagerIsDisposing resourceManager
type DisposeFn = ShortIO (Future ())
type STMDisposerState = TOnce (STM ()) (Future ())
data TDisposerElement = TDisposerElement Unique TIOWorker ExceptionSink STMDisposerState Finalizers
newUnmanagedSTMDisposer :: STM () -> TIOWorker -> ExceptionSink -> STM TDisposer
newUnmanagedSTMDisposer fn worker sink = do
key <- newUniqueSTM
element <- TDisposerElement key worker sink <$> newTOnce fn <*> newFinalizers
pure $ TDisposer [element]
instance Resource TDisposerElement where
toDisposer disposer = Disposer [STMDisposer disposer]
isDisposed (TDisposerElement _ _ _ state _) = join (toFuture state)
isDisposing (TDisposerElement _ _ _ state _) = void (toFuture state)
instance Resource [TDisposerElement] where
toDisposer tds = Disposer (STMDisposer <$> tds)
isDisposed tds = isDisposed (toDisposer tds)
isDisposing tds = isDisposing (toDisposer tds)
disposeTDisposer :: TDisposer -> STM ()
disposeTDisposer (TDisposer elements) = mapM_ go elements
where
go (TDisposerElement _ _ sink state finalizers) = do
future <- mapFinalizeTOnce state startDisposeFn
-- Elements can also be disposed by a resource manager (on a dedicated thread).
-- In that case that thread has to be awaited (otherwise this is a no-op).
awaitSTM future
where
startDisposeFn :: STM () -> STM (Future ())
startDisposeFn disposeFn = do
disposeFn `catchAll` throwToExceptionSink sink
runFinalizers finalizers
pure $ pure ()
beginDisposeSTMDisposer :: TDisposerElement -> STM (Future ())
beginDisposeSTMDisposer (TDisposerElement _ worker sink state finalizers) =
mapFinalizeTOnce state startDisposeFn
where
startDisposeFn :: STM () -> STM (Future ())
startDisposeFn fn = do
awaitableVar <- newPromiseSTM
startShortIOSTM_ (runDisposeFn awaitableVar disposeFn) worker sink
pure $ join (toFuture awaitableVar)
where
disposeFn :: ShortIO (Future ())
disposeFn = unsafeShortIO $ atomically $
-- Spawn a thread only if the transaction retries
(pure <$> fn) `orElse` forkAsyncSTM (atomically fn) worker sink
runDisposeFn :: Promise (Future ()) -> DisposeFn -> ShortIO ()
runDisposeFn awaitableVar disposeFn = mask_ $ handleAll exceptionHandler do
awaitable <- disposeFn
fulfillPromiseShortIO awaitableVar awaitable
runFinalizersAfter finalizers awaitable
where
-- In case of an exception mark disposable as completed to prevent resource managers from being stuck indefinitely
exceptionHandler :: SomeException -> ShortIO ()
exceptionHandler ex = do
fulfillPromiseShortIO awaitableVar (pure ())
runFinalizersShortIO finalizers
throwM $ DisposeException ex
-- | A trivial disposer that does not perform any action when disposed.
trivialDisposer :: Disposer
trivialDisposer = mempty
......@@ -80,21 +156,12 @@ trivialDisposer = mempty
newUnmanagedPrimitiveDisposer :: ShortIO (Future ()) -> TIOWorker -> ExceptionSink -> STM Disposer
newUnmanagedPrimitiveDisposer fn worker exChan = toDisposer <$> do
key <- newUniqueSTM
FnDisposer key worker exChan <$> newTOnce fn <*> newFinalizers
IODisposer key worker exChan <$> newTOnce fn <*> newFinalizers
newUnmanagedIODisposer :: IO () -> TIOWorker -> ExceptionSink -> STM Disposer
-- TODO change TIOWorker behavior for spawning threads, so no `unsafeShortIO` is necessary
newUnmanagedIODisposer fn worker exChan = newUnmanagedPrimitiveDisposer (unsafeShortIO $ forkFuture fn exChan) worker exChan
newUnmanagedSTMDisposer :: STM () -> TIOWorker -> ExceptionSink -> STM Disposer
newUnmanagedSTMDisposer fn worker exChan = newUnmanagedPrimitiveDisposer disposeFn worker exChan
where
disposeFn :: ShortIO (Future ())
disposeFn = unsafeShortIO $ atomically $
-- Spawn a thread only if the transaction retries
(pure <$> fn) `orElse` forkAsyncSTM (atomically fn) worker exChan
dispose :: (MonadIO m, Resource r) => r -> m ()
......@@ -104,8 +171,9 @@ disposeEventuallySTM :: Resource r => r -> STM (Future ())
disposeEventuallySTM (toDisposer -> Disposer ds) = mconcat <$> mapM f ds
where
f :: DisposerElement -> STM (Future ())
f (FnDisposer _ worker exChan state finalizers) =
f (IODisposer _ worker exChan state finalizers) =
beginDisposeFnDisposer worker exChan state finalizers
f (STMDisposer disposer) = beginDisposeSTMDisposer disposer
f (ResourceManagerDisposer resourceManager) =
beginDisposeResourceManager resourceManager
......@@ -139,12 +207,14 @@ beginDisposeFnDisposer worker exChan disposeState finalizers =
throwM $ DisposeException ex
disposerKey :: DisposerElement -> Unique
disposerKey (FnDisposer key _ _ _ _) = key
disposerKey (IODisposer key _ _ _ _) = key
disposerKey (STMDisposer (TDisposerElement key _ _ _ _)) = key
disposerKey (ResourceManagerDisposer resourceManager) = resourceManagerKey resourceManager
disposerFinalizers :: DisposerElement -> Finalizers
disposerFinalizers (FnDisposer _ _ _ _ finalizers) = finalizers
disposerFinalizers (IODisposer _ _ _ _ finalizers) = finalizers
disposerFinalizers (STMDisposer (TDisposerElement _ _ _ _ finalizers)) = finalizers
disposerFinalizers (ResourceManagerDisposer rm) = resourceManagerFinalizers rm
......@@ -248,8 +318,10 @@ beginDisposeResourceManagerInternal rm = do
rmKey = resourceManagerKey rm
resourceManagerBeginDispose :: DisposerElement -> STM DisposeResult
resourceManagerBeginDispose (FnDisposer _ worker exChan state finalizers) =
resourceManagerBeginDispose (IODisposer _ worker exChan state finalizers) =
DisposeResultAwait <$> beginDisposeFnDisposer worker exChan state finalizers
resourceManagerBeginDispose (STMDisposer disposer) =
DisposeResultAwait <$> beginDisposeSTMDisposer disposer
resourceManagerBeginDispose (ResourceManagerDisposer resourceManager) =
DisposeResultDependencies <$> beginDisposeResourceManagerInternal resourceManager
......
......@@ -179,7 +179,7 @@ newUnmanagedTimer scheduler time = liftIO do
key <- newUnique
completed <- newPromise
atomically do
disposer <- newUnmanagedSTMDisposer (disposeFn completed) (ioWorker scheduler) (exceptionSink scheduler)
disposer <- toDisposer <$> newUnmanagedSTMDisposer (disposeFn completed) (ioWorker scheduler) (exceptionSink scheduler)
let timer = Timer { key, time, completed, disposer, scheduler }
tryTakeTMVar (heap scheduler) >>= \case
Just timers -> putTMVar (heap scheduler) (insert timer timers)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment