diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index 578b7afe783da43842b092211d13f4908a3300f9..ed1745b2e438d8d5aa2f49500a4775fcef96ef66 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -32,21 +32,25 @@ import Quasar.ResourceManager -- | TODO: Documentation -- -- The action will be run with asynchronous exceptions unmasked. -async :: MonadResourceManager m => ResourceManagerIO a -> m (Async a) +async :: (MonadResourceManager m, MonadIO m, MonadMask m) => ResourceManagerIO a -> m (Async a) async action = asyncWithUnmask \unmask -> unmask action -- | TODO: Documentation -- -- The action will be run with asynchronous exceptions masked and will be passed an action that can be used to unmask. -asyncWithUnmask :: MonadResourceManager m => ((ResourceManagerIO a -> ResourceManagerIO a) -> ResourceManagerIO r) -> m (Async r) +asyncWithUnmask + :: (MonadResourceManager m, MonadIO m, MonadMask m) + => ((ResourceManagerIO a -> ResourceManagerIO a) + -> ResourceManagerIO r) + -> m (Async r) asyncWithUnmask action = do resourceManager <- askResourceManager asyncWithHandlerAndUnmask (throwToResourceManager resourceManager . AsyncException) action -async_ :: MonadResourceManager m => ResourceManagerIO () -> m () +async_ :: (MonadResourceManager m, MonadIO m, MonadMask m) => ResourceManagerIO () -> m () async_ action = void $ async action -asyncWithUnmask_ :: MonadResourceManager m => ((ResourceManagerIO a -> ResourceManagerIO a) -> ResourceManagerIO ()) -> m () +asyncWithUnmask_ :: (MonadResourceManager m, MonadIO m, MonadMask m) => ((ResourceManagerIO a -> ResourceManagerIO a) -> ResourceManagerIO ()) -> m () asyncWithUnmask_ action = void $ asyncWithUnmask action -- | TODO: Documentation @@ -54,7 +58,7 @@ asyncWithUnmask_ action = void $ asyncWithUnmask action -- The action will be run with asynchronous exceptions unmasked. When an exception is thrown that is not caused from -- the disposable instance (i.e. the task being canceled), the handler is called with that exception. asyncWithHandlerAndUnmask - :: MonadResourceManager m + :: (MonadResourceManager m, MonadIO m, MonadMask m) => (SomeException -> IO ()) -> ((ResourceManagerIO a -> ResourceManagerIO a) -> ResourceManagerIO r) @@ -74,19 +78,19 @@ asyncWithHandlerAndUnmask handler action = do liftIO $ unmask $ onResourceManager resourceManager innerAction asyncWithHandlerAndUnmask_ - :: MonadResourceManager m + :: (MonadResourceManager m, MonadIO m, MonadMask m) => (SomeException -> IO ()) -> ((ResourceManagerIO a -> ResourceManagerIO a) -> ResourceManagerIO r) -> m () asyncWithHandlerAndUnmask_ handler action = void $ asyncWithHandlerAndUnmask handler action -asyncWithHandler :: MonadResourceManager m => (SomeException -> IO ()) -> ResourceManagerIO r -> m (Async r) +asyncWithHandler :: (MonadResourceManager m, MonadIO m, MonadMask m) => (SomeException -> IO ()) -> ResourceManagerIO r -> m (Async r) asyncWithHandler handler action = asyncWithHandlerAndUnmask handler \unmask -> unmask action -asyncWithHandler_ :: MonadResourceManager m => (SomeException -> IO ()) -> ResourceManagerIO r -> m () +asyncWithHandler_ :: (MonadResourceManager m, MonadIO m, MonadMask m) => (SomeException -> IO ()) -> ResourceManagerIO r -> m () asyncWithHandler_ handler action = void $ asyncWithHandler handler action -withAsync :: MonadResourceManager m => ResourceManagerIO r -> (Async r -> m a) -> m a +withAsync :: (MonadResourceManager m, MonadIO m, MonadMask m) => ResourceManagerIO r -> (Async r -> m a) -> m a withAsync action = bracket (async action) dispose diff --git a/src/Quasar/Disposable.hs b/src/Quasar/Disposable.hs index a79a00de129f61bd4713acd66c56f00c0175e96f..a9a32dd3dcfbca994f4fcce6ce66a9a519e943da 100644 --- a/src/Quasar/Disposable.hs +++ b/src/Quasar/Disposable.hs @@ -151,10 +151,10 @@ instance IsDisposable IODisposable where -- `dispose` is called multiple times). -- -- The action must not block for an unbound time. -newDisposable :: MonadIO m => IO () -> m Disposable -newDisposable disposeAction = liftIO do - key <- newUnique - fmap toDisposable $ IODisposable key <$> newTMVarIO disposeAction <*> newDisposableFinalizers <*> newAsyncVar +newDisposable :: IO () -> STM Disposable +newDisposable disposeAction = do + key <- newUniqueSTM + fmap toDisposable $ IODisposable key <$> newTMVar disposeAction <*> newDisposableFinalizersSTM <*> newAsyncVarSTM data AsyncDisposable = AsyncDisposable Unique (TMVar (IO ())) DisposableFinalizers (AsyncVar ()) @@ -178,10 +178,10 @@ instance IsDisposable AsyncDisposable where -- action will only be called once (even when `dispose` is called multiple times). -- -- The action must not block for an unbound time. -newAsyncDisposable :: MonadIO m => IO () -> m Disposable -newAsyncDisposable disposeAction = liftIO do - key <- newUnique - fmap toDisposable $ AsyncDisposable key <$> newTMVarIO disposeAction <*> newDisposableFinalizers <*> newAsyncVar +newAsyncDisposable :: IO () -> STM Disposable +newAsyncDisposable disposeAction = do + key <- newUniqueSTM + fmap toDisposable $ AsyncDisposable key <$> newTMVar disposeAction <*> newDisposableFinalizersSTM <*> newAsyncVarSTM diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index 7d4118b6efbee3dc42481d28b2cb6f4f967128c7..37a0bc84a996b1ba164ce47dc3058e7e8fcf90d5 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -69,7 +69,7 @@ toObservableUpdate (ObservableNotAvailable ex) = throwM ex class IsRetrievable v a | a -> v where - retrieve :: MonadResourceManager m => a -> m (Awaitable v) + retrieve :: (MonadResourceManager m, MonadIO m, MonadMask m) => a -> m (Awaitable v) class IsRetrievable v o => IsObservable v o | o -> v where -- | Register a callback to observe changes. The callback is called when the value changes, but depending on the @@ -82,7 +82,7 @@ class IsRetrievable v o => IsObservable v o | o -> v where -- processed immediately, use `observeBlocking` instead or manually pass the value to a thread that processes the -- data, e.g. by using STM. observe - :: MonadResourceManager m + :: (MonadResourceManager m, MonadIO m, MonadMask m) => o -- ^ observable -> (ObservableMessage v -> ResourceManagerIO ()) -- ^ callback -> m () @@ -103,7 +103,11 @@ class IsRetrievable v o => IsObservable v o | o -> v where -- -- The handler is allowed to block. When the value changes while the handler is running the handler will be run again -- after it completes; when the value changes multiple times it will only be executed once (with the latest value). -observeBlocking :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m ()) -> m a +observeBlocking + :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) + => o + -> (ObservableMessage v -> m ()) + -> m a observeBlocking observable handler = do -- `withScopedResourceManager` removes the `observe` callback when the `handler` fails. withScopedResourceManager do @@ -124,7 +128,11 @@ data ObserveWhileCompleted = ObserveWhileCompleted instance Exception ObserveWhileCompleted -- | Observe until the callback returns `Just`. -observeWhile :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m (Maybe a)) -> m a +observeWhile + :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) + => o + -> (ObservableMessage v -> m (Maybe a)) + -> m a observeWhile observable callback = do resultVar <- liftIO $ newIORef unreachableCodePath observeWhile_ observable \msg -> do @@ -138,7 +146,11 @@ observeWhile observable callback = do -- | Observe until the callback returns `False`. -observeWhile_ :: (IsObservable v o, MonadResourceManager m) => o -> (ObservableMessage v -> m Bool) -> m () +observeWhile_ + :: (IsObservable v o, MonadResourceManager m, MonadIO m, MonadMask m) + => o + -> (ObservableMessage v -> m Bool) + -> m () observeWhile_ observable callback = catch do @@ -201,11 +213,11 @@ data BindObservable r = forall a. BindObservable (Observable a) (a -> Observable instance IsRetrievable r (BindObservable r) where retrieve (BindObservable fx fn) = do - x <- await =<< retrieve fx - retrieve $ fn x + awaitable <- retrieve fx + value <- liftIO $ await awaitable + retrieve $ fn value instance IsObservable r (BindObservable r) where - observe :: MonadResourceManager m => (BindObservable r) -> (ObservableMessage r -> ResourceManagerIO ()) -> m () observe (BindObservable fx fn) callback = do disposableVar <- liftIO $ newTMVarIO noDisposable keyVar <- liftIO $ newTMVarIO =<< newUnique @@ -247,7 +259,6 @@ instance IsRetrievable r (CatchObservable e r) where retrieve (CatchObservable fx fn) = retrieve fx `catch` \ex -> retrieve (fn ex) instance IsObservable r (CatchObservable e r) where - observe :: MonadResourceManager m => (CatchObservable e r) -> (ObservableMessage r -> ResourceManagerIO ()) -> m () observe (CatchObservable fx fn) callback = do disposableVar <- liftIO $ newTMVarIO noDisposable keyVar <- liftIO $ newTMVarIO =<< newUnique @@ -266,7 +277,8 @@ instance IsObservable r (CatchObservable e r) where disposeEventually_ oldDisposable disposable <- case message of - (ObservableNotAvailable (fromException -> Just ex)) -> captureDisposable_ $ observe (fn ex) (rightCallback keyVar key) + (ObservableNotAvailable (fromException -> Just ex)) -> + captureDisposable_ $ observe (fn ex) (rightCallback keyVar key) msg -> noDisposable <$ callback msg liftIO $ atomically $ putTMVar disposableVar disposable @@ -289,17 +301,18 @@ instance IsRetrievable v (ObservableVar v) where instance IsObservable v (ObservableVar v) where observe observable@(ObservableVar mvar) callback = do resourceManager <- askResourceManager - key <- liftIO newUnique - registerNewResource_ do + registerNewResource_ $ liftIO do let wrappedCallback = enterResourceManager resourceManager . callback - liftIO $ modifyMVar_ mvar $ \(state, subscribers) -> do + key <- liftIO newUnique + + modifyMVar_ mvar $ \(state, subscribers) -> do -- Call listener with initial value wrappedCallback (pure state) pure (state, HM.insert key wrappedCallback subscribers) - newDisposable $ disposeFn key + atomically $ newDisposable $ disposeFn key where disposeFn :: Unique -> IO () disposeFn key = modifyMVar_ mvar (\(state, subscribers) -> pure (state, HM.delete key subscribers)) diff --git a/src/Quasar/Observable/ObservableHashMap.hs b/src/Quasar/Observable/ObservableHashMap.hs index c05a1fc5173530c9db97d036298ecfd5eb079d33..4a3884870960557829df96fe143e221de9e02a3d 100644 --- a/src/Quasar/Observable/ObservableHashMap.hs +++ b/src/Quasar/Observable/ObservableHashMap.hs @@ -10,6 +10,7 @@ module Quasar.Observable.ObservableHashMap ( lookupDelete, ) where +import Control.Concurrent.STM (atomically) import Data.HashMap.Strict qualified as HM import Quasar.Disposable import Quasar.Observable @@ -53,7 +54,7 @@ instance IsDeltaObservable k v (ObservableHashMap k v) where callback (Reset $ toHashMap handle) key <- newUnique let handle' = handle {deltaSubscribers = HM.insert key callback (deltaSubscribers handle)} - (handle',) <$> newDisposable (unsubscribe key) + (handle',) <$> atomically (newDisposable (unsubscribe key)) unsubscribe :: Unique -> IO () unsubscribe key = modifyHandle_ (\handle -> pure handle {deltaSubscribers = HM.delete key (deltaSubscribers handle)}) ohm diff --git a/src/Quasar/Observable/ObservablePriority.hs b/src/Quasar/Observable/ObservablePriority.hs index d341cf61b5a9f76ba42b80cbbb01c34eecb079fe..25eac7783e5ad5fe0d793a503ff6bfa973501dd0 100644 --- a/src/Quasar/Observable/ObservablePriority.hs +++ b/src/Quasar/Observable/ObservablePriority.hs @@ -4,6 +4,7 @@ module Quasar.Observable.ObservablePriority ( insertValue, ) where +import Control.Concurrent.STM (atomically) import Data.HashMap.Strict qualified as HM import Data.List (maximumBy) import Data.List.NonEmpty (NonEmpty(..), nonEmpty) @@ -63,7 +64,7 @@ insertValue :: forall p v m. MonadIO m => (Ord p, Hashable p) => ObservablePrior insertValue (ObservablePriority mvar) priority value = liftIO $ modifyMVar mvar $ \internals -> do key <- newUnique newInternals <- insertValue' key internals - (newInternals,) <$> newDisposable (removeValue key) + (newInternals,) <$> atomically (newDisposable (removeValue key)) where insertValue' :: Unique -> Internals p v -> IO (Internals p v) insertValue' key internals@Internals{priorityMap, current} diff --git a/src/Quasar/ResourceManager.hs b/src/Quasar/ResourceManager.hs index be1a0288bd359dc963ff1b4216f3944721502f5e..07e805770063beba1627fb200821002260b17ebe 100644 --- a/src/Quasar/ResourceManager.hs +++ b/src/Quasar/ResourceManager.hs @@ -3,6 +3,7 @@ module Quasar.ResourceManager ( MonadResourceManager(..), ResourceManagerT, ResourceManagerIO, + ResourceManagerSTM, FailedToRegisterResource, registerNewResource, registerNewResource_, @@ -15,6 +16,7 @@ module Quasar.ResourceManager ( captureDisposable_, disposeOnError, liftResourceManagerIO, + runInResourceManagerSTM, enterResourceManager, lockResourceManager, @@ -86,15 +88,9 @@ class IsDisposable a => IsResourceManager a where -- | Attaches an `Disposable` to a ResourceManager. It will automatically be disposed when the resource manager is disposed. -- -- May throw an `FailedToRegisterResource` if the resource manager is disposing/disposed. - attachDisposable :: (IsDisposable b, MonadIO m) => a -> b -> m () + attachDisposable :: IsDisposable b => a -> b -> STM () attachDisposable self = attachDisposable (toResourceManager self) - -- | Attaches an `Disposable` to a ResourceManager. It will automatically be disposed when the resource manager is disposed. - -- - -- May throw an `FailedToRegisterResource` if the resource manager is disposing/disposed. - attachDisposableSTM :: IsDisposable b => a -> b -> STM () - attachDisposableSTM self = attachDisposableSTM (toResourceManager self) - lockResourceManagerImpl :: (MonadIO m, MonadMask m) => a -> m b -> m b lockResourceManagerImpl self = lockResourceManagerImpl (toResourceManager self) @@ -102,36 +98,40 @@ class IsDisposable a => IsResourceManager a where throwToResourceManager :: Exception e => a -> e -> IO () throwToResourceManager = throwToResourceManager . toResourceManager - {-# MINIMAL toResourceManager | (attachDisposable, attachDisposableSTM, lockResourceManagerImpl, throwToResourceManager) #-} + {-# MINIMAL toResourceManager | (attachDisposable, lockResourceManagerImpl, throwToResourceManager) #-} data ResourceManager = forall a. IsResourceManager a => ResourceManager a instance IsResourceManager ResourceManager where toResourceManager = id attachDisposable (ResourceManager x) = attachDisposable x - attachDisposableSTM (ResourceManager x) = attachDisposableSTM x lockResourceManagerImpl (ResourceManager x) = lockResourceManagerImpl x throwToResourceManager (ResourceManager x) = throwToResourceManager x instance IsDisposable ResourceManager where toDisposable (ResourceManager x) = toDisposable x -class (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager m where + +class MonadFix m => MonadResourceManager m where -- | Get the underlying resource manager. askResourceManager :: m ResourceManager -- | Replace the resource manager for a computation. localResourceManager :: IsResourceManager a => a -> m r -> m r + -- | Locks the resource manager. As long as the resource manager is locked, it's possible to register new resources + -- on the resource manager. + -- + -- This prevents the resource manager from disposing, so the computation must not block for an unbound amount of time. + lockResourceManager :: MonadResourceManager m => m a -> m a + -- | Run an `STM` computation. Depending on the monad this may be run in a dedicated STM transaction or may be + -- embedded in a larger transaction. + runInSTM :: MonadResourceManager m => STM a -> m a --- | Locks the resource manager. As long as the resource manager is locked, it's possible to register new resources --- on the resource manager. --- --- This prevents the resource manager from disposing, so the computation must not block for an unbound amount of time. -lockResourceManager :: MonadResourceManager m => m a -> m a -lockResourceManager action = do +runInResourceManagerSTM :: MonadResourceManager m => ResourceManagerSTM a -> m a +runInResourceManagerSTM action = do resourceManager <- askResourceManager - lockResourceManagerImpl resourceManager action + runInSTM $ runReaderT action resourceManager -- | Register a `Disposable` to the resource manager. -- @@ -139,42 +139,53 @@ lockResourceManager action = do registerDisposable :: (IsDisposable a, MonadResourceManager m) => a -> m () registerDisposable disposable = do resourceManager <- askResourceManager - attachDisposable resourceManager disposable + runInSTM $ attachDisposable resourceManager disposable registerDisposeAction :: MonadResourceManager m => IO () -> m () -registerDisposeAction disposeAction = mask_ $ registerDisposable =<< newDisposable disposeAction +registerDisposeAction disposeAction = runInResourceManagerSTM do + disposable <- lift (newDisposable disposeAction) + registerDisposable disposable registerAsyncDisposeAction :: MonadResourceManager m => IO () -> m () -registerAsyncDisposeAction disposeAction = mask_ $ registerDisposable =<< newAsyncDisposable disposeAction +registerAsyncDisposeAction disposeAction = runInResourceManagerSTM do + disposable <- lift (newAsyncDisposable disposeAction) + registerDisposable disposable -- | Locks the resource manager (which may fail), runs the computation and registeres the resulting disposable. -- -- The computation will be run in masked state. -- -- The computation must not block for an unbound amount of time. -registerNewResource :: (IsDisposable a, MonadResourceManager m) => m a -> m a +registerNewResource :: (IsDisposable a, MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m a registerNewResource action = mask_ $ lockResourceManager do resource <- action registerDisposable resource pure resource -registerNewResource_ :: (IsDisposable a, MonadResourceManager m) => m a -> m () +registerNewResource_ :: (IsDisposable a, MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m () registerNewResource_ action = void $ registerNewResource action -withScopedResourceManager :: MonadResourceManager m => m a -> m a +withScopedResourceManager :: (MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m a withScopedResourceManager action = bracket newResourceManager dispose \scope -> localResourceManager scope action type ResourceManagerT = ReaderT ResourceManager type ResourceManagerIO = ResourceManagerT IO +type ResourceManagerSTM = ResourceManagerT STM instance (MonadAwait m, MonadMask m, MonadIO m, MonadFix m) => MonadResourceManager (ResourceManagerT m) where localResourceManager resourceManager = local (const (toResourceManager resourceManager)) askResourceManager = ask + lockResourceManager action = do + resourceManager <- askResourceManager + lockResourceManagerImpl resourceManager action + + runInSTM action = liftIO $ atomically action + instance {-# OVERLAPPABLE #-} MonadResourceManager m => MonadResourceManager (ReaderT r m) where askResourceManager = lift askResourceManager @@ -183,13 +194,32 @@ instance {-# OVERLAPPABLE #-} MonadResourceManager m => MonadResourceManager (Re x <- ask lift $ localResourceManager resourceManager $ runReaderT action x + lockResourceManager action = do + x <- ask + lift $ lockResourceManager $ runReaderT action x + + runInSTM action = lift $ runInSTM action + -- TODO MonadResourceManager instances for StateT, WriterT, RWST, MaybeT, ... +-- Overlaps the ResourceManagerT-instance, because `MonadIO` _could_ be specified for `STM` (which would be very +-- very incorrect, so this is safe). +instance {-# OVERLAPS #-} MonadResourceManager (ResourceManagerT STM) where + localResourceManager resourceManager = local (const (toResourceManager resourceManager)) + + askResourceManager = ask + + -- | No-op, since STM is always executed atomically. + lockResourceManager = id + + runInSTM action = lift action + + onResourceManager :: (IsResourceManager a, MonadIO m) => a -> ResourceManagerIO r -> m r onResourceManager target action = liftIO $ runReaderT action (toResourceManager target) -liftResourceManagerIO :: MonadResourceManager m => ResourceManagerIO r -> m r +liftResourceManagerIO :: (MonadResourceManager m, MonadIO m) => ResourceManagerIO r -> m r liftResourceManagerIO action = do resourceManager <- askResourceManager onResourceManager resourceManager action @@ -205,7 +235,7 @@ captureDisposable_ :: MonadResourceManager m => m () -> m Disposable captureDisposable_ = snd <<$>> captureDisposable -- | Disposes all resources created by the computation if the computation throws an exception. -disposeOnError :: MonadResourceManager m => m a -> m a +disposeOnError :: (MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m a disposeOnError action = do bracketOnError newResourceManager @@ -234,7 +264,6 @@ data RootResourceManager instance IsResourceManager RootResourceManager where attachDisposable (RootResourceManager internal _ _ _) = attachDisposable internal - attachDisposableSTM (RootResourceManager internal _ _ _) = attachDisposableSTM internal lockResourceManagerImpl (RootResourceManager internal _ _ _) = lockResourceManagerImpl internal throwToResourceManager (RootResourceManager _ _ exceptionsVar _) ex = do -- TODO only log exceptions after a timeout @@ -326,9 +355,7 @@ data ResourceManagerState instance IsResourceManager DefaultResourceManager where throwToResourceManager DefaultResourceManager{throwToHandler} = throwToHandler . toException - attachDisposable self disposable = liftIO $ atomically $ attachDisposableSTM self disposable - - attachDisposableSTM DefaultResourceManager{stateVar, disposablesVar} disposable = do + attachDisposable DefaultResourceManager{stateVar, disposablesVar} disposable = do key <- newUniqueSTM state <- readTVar stateVar case state of @@ -480,30 +507,31 @@ newUnmanagedDefaultResourceManagerInternal parentResourceManager = do } newResourceManager :: MonadResourceManager m => m ResourceManager -newResourceManager = mask_ do +newResourceManager = do parent <- askResourceManager - resourceManager <- liftIO $ atomically $ toResourceManager <$> newUnmanagedDefaultResourceManagerInternal parent - registerDisposable resourceManager - pure resourceManager + runInResourceManagerSTM do + resourceManager <- lift $ toResourceManager <$> newUnmanagedDefaultResourceManagerInternal parent + registerDisposable resourceManager + pure resourceManager newResourceManagerSTM :: ResourceManager -> STM ResourceManager newResourceManagerSTM parent = do resourceManager <- toResourceManager <$> newUnmanagedDefaultResourceManagerInternal parent - attachDisposableSTM parent resourceManager + attachDisposable parent resourceManager pure resourceManager -- * Utilities -- | Creates an `Disposable` that is bound to a ResourceManager. It will automatically be disposed when the resource manager is disposed. -attachDisposeAction :: MonadIO m => ResourceManager -> IO () -> m Disposable -attachDisposeAction resourceManager action = liftIO $ mask_ $ do +attachDisposeAction :: ResourceManager -> IO () -> STM Disposable +attachDisposeAction resourceManager action = do disposable <- newDisposable action attachDisposable resourceManager disposable pure disposable -- | Attaches a dispose action to a ResourceManager. It will automatically be run when the resource manager is disposed. -attachDisposeAction_ :: MonadIO m => ResourceManager -> IO () -> m () +attachDisposeAction_ :: ResourceManager -> IO () -> STM () attachDisposeAction_ resourceManager action = void $ attachDisposeAction resourceManager action @@ -525,7 +553,7 @@ data LinkState = LinkStateLinked ThreadId | LinkStateThrowing | LinkStateComplet -- -- The computation is executed on the current thread. When the resource manager is disposed before the computation -- is completed, a `CancelLinkedExecution`-exception is thrown to the current thread. -linkExecution :: MonadResourceManager m => m a -> m (Maybe a) +linkExecution :: (MonadResourceManager m, MonadIO m, MonadMask m) => m a -> m (Maybe a) linkExecution action = do key <- liftIO $ newUnique var <- liftIO $ newTVarIO =<< LinkStateLinked <$> myThreadId diff --git a/src/Quasar/Subscribable.hs b/src/Quasar/Subscribable.hs index e053481c83077e92eed6c20d817d52180ed1eaa1..8e6abe6c72b277fcb57d5df2ced45be24e1d5f8e 100644 --- a/src/Quasar/Subscribable.hs +++ b/src/Quasar/Subscribable.hs @@ -31,9 +31,9 @@ class IsSubscribable r a | a -> r where toSubscribable x = Subscribable x subscribe - :: MonadResourceManager m + :: (MonadResourceManager m, MonadIO m, MonadMask m) => a - -> (forall f. MonadResourceManager f => SubscribableMessage r -> f (Awaitable ())) + -> (SubscribableMessage r -> ResourceManagerIO (Awaitable ())) -> m () subscribe x = subscribe (toSubscribable x) {-# MINIMAL toSubscribable | subscribe #-} @@ -71,10 +71,11 @@ instance IsSubscribable r (SubscribableEvent r) where subscribe (SubscribableEvent tvar) callback = mask_ do key <- liftIO newUnique resourceManager <- askResourceManager - liftIO $ atomically do - callbackMap <- readTVar tvar - writeTVar tvar $ HM.insert key (\msg -> runReaderT (callback msg) resourceManager) callbackMap - registerDisposable =<< newDisposable (disposeFn key) + runInResourceManagerSTM do + registerDisposable =<< lift do + callbackMap <- readTVar tvar + writeTVar tvar $ HM.insert key (\msg -> runReaderT (callback msg) resourceManager) callbackMap + newDisposable (disposeFn key) where disposeFn :: Unique -> IO () disposeFn key = atomically do diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs index caa0714be6598d1834deeeafbf791ec8f32b325d..e00da88790cb61bacc68ab1dfcc0773dcf44b00e 100644 --- a/src/Quasar/Timer.hs +++ b/src/Quasar/Timer.hs @@ -73,7 +73,7 @@ data TimerSchedulerDisposed = TimerSchedulerDisposed instance Exception TimerSchedulerDisposed -newTimerScheduler :: MonadResourceManager m => m TimerScheduler +newTimerScheduler :: (MonadResourceManager m, MonadIO m, MonadMask m) => m TimerScheduler newTimerScheduler = registerNewResource newUnmanagedTimerScheduler newUnmanagedTimerScheduler :: MonadIO m => m TimerScheduler @@ -169,7 +169,7 @@ startSchedulerThread scheduler = toDisposable <$> unmanagedAsync (schedulerThrea mapM_ dispose timers -newTimer :: MonadResourceManager m => TimerScheduler -> UTCTime -> m Timer +newTimer :: (MonadResourceManager m, MonadIO m, MonadMask m) => TimerScheduler -> UTCTime -> m Timer newTimer scheduler time = registerNewResource $ newUnmanagedTimer scheduler time @@ -206,7 +206,7 @@ newtype Delay = Delay (Async ()) instance IsAwaitable () Delay where toAwaitable (Delay task) = toAwaitable task `catch` \AsyncDisposed -> throwM TimerCancelled -newDelay :: MonadResourceManager m => Int -> m Delay +newDelay :: (MonadResourceManager m, MonadIO m, MonadMask m) => Int -> m Delay newDelay microseconds = registerNewResource $ newUnmanagedDelay microseconds newUnmanagedDelay :: MonadIO m => Int -> m Delay diff --git a/src/Quasar/Timer/PosixTimer.hsc b/src/Quasar/Timer/PosixTimer.hsc index efc461f010a34d05d957293e52b28ca24ebb12a2..ba9e098114e45af1b40b952545723daa0f60eb0c 100644 --- a/src/Quasar/Timer/PosixTimer.hsc +++ b/src/Quasar/Timer/PosixTimer.hsc @@ -15,6 +15,8 @@ module Quasar.Timer.PosixTimer ( ) where import Control.Concurrent +import Control.Monad.Catch (MonadMask) +import Control.Monad.STM (atomically) import Foreign import Foreign.C import Quasar.Disposable @@ -164,7 +166,7 @@ instance IsDisposable PosixTimer where toDisposable = disposable -newPosixTimer :: MonadResourceManager m => ClockId -> IO () -> m PosixTimer +newPosixTimer :: (MonadResourceManager m, MonadIO m, MonadMask m) => ClockId -> IO () -> m PosixTimer newPosixTimer clockId callback = registerNewResource do liftIO $ newUnmanagedPosixTimer clockId callback @@ -179,7 +181,7 @@ newUnmanagedPosixTimer clockId callback = runInBoundThread do c_timer_create (toCClockId clockId) sigevent ctimerPtr peek ctimerPtr - disposable <- newDisposable (delete ctimer callbackPtr) + disposable <- atomically $ newDisposable (delete ctimer callbackPtr) pure $ PosixTimer { ctimer, disposable } where diff --git a/src/Quasar/Timer/TimerFd.hs b/src/Quasar/Timer/TimerFd.hs index 7921259c80710bdba689b2876b456a53fe6c23bb..31fa49b6c2f4cd601cfed06184757008c0a66ec8 100644 --- a/src/Quasar/Timer/TimerFd.hs +++ b/src/Quasar/Timer/TimerFd.hs @@ -43,7 +43,7 @@ newtype TimerFd = TimerFd Fd deriving stock (Eq, Show) deriving newtype Num -newTimerFd :: MonadResourceManager m => ClockId -> IO () -> m TimerFd +newTimerFd :: (MonadResourceManager m, MonadIO m, MonadMask m) => ClockId -> IO () -> m TimerFd newTimerFd clockId callback = mask_ do timer <- liftIO $ runInBoundThread do throwErrnoIfMinus1 "timerfd_create" do diff --git a/test/Quasar/DisposableSpec.hs b/test/Quasar/DisposableSpec.hs index 90308ad5831cf76bd1818d94c96d693cde69f502..087e61ee2965abce3f21f03d0b2788fe7633f6fe 100644 --- a/test/Quasar/DisposableSpec.hs +++ b/test/Quasar/DisposableSpec.hs @@ -1,6 +1,7 @@ module Quasar.DisposableSpec (spec) where import Control.Concurrent +import Control.Concurrent.STM import Quasar.Prelude import Test.Hspec import Quasar.Awaitable @@ -18,18 +19,18 @@ spec = parallel $ do describe "newDisposable" $ do it "signals it's disposed state" $ io do - disposable <- newDisposable $ pure () + disposable <- atomically $ newDisposable $ pure () void $ forkIO $ threadDelay 100000 >> dispose disposable await (isDisposed disposable) it "can be disposed multiple times" $ io do - disposable <- newDisposable $ pure () + disposable <- atomically $ newDisposable $ pure () dispose disposable dispose disposable await (isDisposed disposable) it "can be disposed in parallel" $ do - disposable <- newDisposable $ threadDelay 100000 + disposable <- atomically $ newDisposable $ threadDelay 100000 void $ forkIO $ dispose disposable dispose disposable await (isDisposed disposable) diff --git a/test/Quasar/ResourceManagerSpec.hs b/test/Quasar/ResourceManagerSpec.hs index 0d31b58a28a2e86a72fd2f5b13e2b30a5f74e3cb..6224d531a1e930c9aa2d8cb5778b34b45089c3ee 100644 --- a/test/Quasar/ResourceManagerSpec.hs +++ b/test/Quasar/ResourceManagerSpec.hs @@ -111,27 +111,30 @@ spec = parallel $ do withRootResourceManager do rm1 <- newResourceManager rm2 <- newResourceManager - attachDisposable rm1 rm2 - attachDisposable rm2 rm1 + liftIO $ atomically do + attachDisposable rm1 rm2 + attachDisposable rm2 rm1 it "can dispose a resource manager loop" $ io do withRootResourceManager do rm1 <- newResourceManager rm2 <- newResourceManager - attachDisposable rm1 rm2 - attachDisposable rm2 rm1 + liftIO $ atomically do + attachDisposable rm1 rm2 + attachDisposable rm2 rm1 dispose rm1 it "can dispose a resource manager loop with a shared disposable" $ io do var <- newTVarIO (0 :: Int) - d <- newDisposable $ atomically $ modifyTVar var (+ 1) + d <- atomically $ newDisposable $ atomically $ modifyTVar var (+ 1) withRootResourceManager do rm1 <- newResourceManager rm2 <- newResourceManager - attachDisposable rm1 rm2 - attachDisposable rm2 rm1 - attachDisposable rm1 d - attachDisposable rm2 d + liftIO $ atomically do + attachDisposable rm1 rm2 + attachDisposable rm2 rm1 + attachDisposable rm1 d + attachDisposable rm2 d atomically (readTVar var) `shouldReturn` 1