From 142894f9bd2133c5a3955381c7ae2ded052e6dc3 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 24 Oct 2021 01:05:54 +0200 Subject: [PATCH] New timer api with optional resource manager integration --- src/Quasar/Awaitable.hs | 4 +- src/Quasar/Disposable.hs | 75 +++++++++++++++++++---- src/Quasar/Timer.hs | 125 +++++++++++++++++++++------------------ 3 files changed, 134 insertions(+), 70 deletions(-) diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs index 91de38d..65bcd7c 100644 --- a/src/Quasar/Awaitable.hs +++ b/src/Quasar/Awaitable.hs @@ -473,5 +473,5 @@ anySTM (x :| xs) = x `orElse` maybe retry anySTM (nonEmpty xs) -- | Like `awaitAny` with two awaitables. -awaitAny2 :: (IsAwaitable r a, MonadAwait m) => a -> a -> m r -awaitAny2 x y = awaitAny (x :| [y]) +awaitAny2 :: (IsAwaitable r a, IsAwaitable r b, MonadAwait m) => a -> b -> m r +awaitAny2 x y = awaitAny (toAwaitable x :| [toAwaitable y]) diff --git a/src/Quasar/Disposable.hs b/src/Quasar/Disposable.hs index 1b7b45c..678b7ba 100644 --- a/src/Quasar/Disposable.hs +++ b/src/Quasar/Disposable.hs @@ -9,6 +9,12 @@ module Quasar.Disposable ( newDisposable, noDisposable, + -- ** STM disposable + STMDisposable, + newSTMDisposable, + newSTMDisposable', + disposeSTMDisposable, + -- * Implementation internals DisposeResult(..), ResourceManagerResult(..), @@ -114,10 +120,10 @@ instance IsAwaitable () Disposable where -data ImmediateDisposable = ImmediateDisposable Unique (TMVar (IO ())) DisposableFinalizers (AsyncVar ()) +data IODisposable = IODisposable Unique (TMVar (IO ())) DisposableFinalizers (AsyncVar ()) -instance IsDisposable ImmediateDisposable where - beginDispose (ImmediateDisposable key actionVar finalizers resultVar) = do +instance IsDisposable IODisposable where + beginDispose (IODisposable key actionVar finalizers resultVar) = do -- This is only safe when run in masked state atomically (tryTakeTMVar actionVar) >>= mapM_ \action -> do result <- try action @@ -128,21 +134,66 @@ instance IsDisposable ImmediateDisposable where await resultVar pure DisposeResultDisposed - isDisposed (ImmediateDisposable _ _ _ resultVar) = toAwaitable resultVar `catchAll` \_ -> pure () - - registerFinalizer (ImmediateDisposable _ _ finalizers _) = defaultRegisterFinalizer finalizers - -newImmediateDisposable :: MonadIO m => IO () -> m Disposable -newImmediateDisposable disposeAction = liftIO do - key <- newUnique - fmap toDisposable $ ImmediateDisposable key <$> newTMVarIO disposeAction <*> newDisposableFinalizers <*> newAsyncVar + isDisposed (IODisposable _ _ _ resultVar) = toAwaitable resultVar `catchAll` \_ -> pure () + registerFinalizer (IODisposable _ _ finalizers _) = defaultRegisterFinalizer finalizers -- | Create a new disposable from an IO action. Is is guaranteed, that the IO action will only be called once (even when -- `dispose` is called multiple times). +-- +-- The action must not block for an unbound time. newDisposable :: MonadIO m => IO () -> m Disposable -newDisposable = newImmediateDisposable +newDisposable disposeAction = liftIO do + key <- newUnique + fmap toDisposable $ IODisposable key <$> newTMVarIO disposeAction <*> newDisposableFinalizers <*> newAsyncVar + + +data STMDisposable = STMDisposable Unique (TMVar (STM ())) DisposableFinalizers (AsyncVar ()) + +instance IsDisposable STMDisposable where + beginDispose (STMDisposable key actionVar finalizers resultVar) = do + -- This is only safe when run in masked state + atomically (tryTakeTMVar actionVar) >>= mapM_ \action -> do + atomically do + result <- try action + putAsyncVarEitherSTM_ resultVar result + defaultRunFinalizers finalizers + -- Await so concurrent `beginDispose` calls don't exit too early + await resultVar + pure DisposeResultDisposed + + isDisposed (STMDisposable _ _ _ resultVar) = toAwaitable resultVar `catchAll` \_ -> pure () + + registerFinalizer (STMDisposable _ _ finalizers _) = defaultRegisterFinalizer finalizers + +-- | Create a new disposable from an STM action. Is is guaranteed, that the STM action will only be called once (even +-- when `dispose` is called multiple times). +-- +-- The action must not block (retry) for an unbound time. +newSTMDisposable :: MonadIO m => STM () -> m Disposable +newSTMDisposable disposeAction = toDisposable <$> newSTMDisposable' disposeAction + +-- | Create a new disposable from an STM action. Is is guaranteed, that the STM action will only be called once (even +-- when `dispose` is called multiple times). +-- +-- The action must not block (retry) for an unbound time. +-- +-- This variant of `newSTMDisposable` returns an unboxed `STMDisposable` which can be disposed from `STM` by using +-- `disposeSTMDisposable`. +newSTMDisposable' :: MonadIO m => STM () -> m STMDisposable +newSTMDisposable' disposeAction = liftIO do + key <- newUnique + STMDisposable key <$> newTMVarIO disposeAction <*> newDisposableFinalizers <*> newAsyncVar + +disposeSTMDisposable :: STMDisposable -> STM () +disposeSTMDisposable (STMDisposable key actionVar finalizers resultVar) = do + tryTakeTMVar actionVar >>= \case + Just action -> do + result <- try action + putAsyncVarEitherSTM_ resultVar result + defaultRunFinalizers finalizers + Nothing -> readAsyncVarSTM resultVar data EmptyDisposable = EmptyDisposable diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs index 8bf24f3..99571bd 100644 --- a/src/Quasar/Timer.hs +++ b/src/Quasar/Timer.hs @@ -3,15 +3,18 @@ module Quasar.Timer ( Timer, newTimer, + newUnmanagedTimer, sleepUntil, TimerScheduler, newTimerScheduler, + newUnmanagedTimerScheduler, TimerCancelled, Delay, newDelay, + newUnmanagedDelay, ) where import Control.Concurrent @@ -20,6 +23,7 @@ import Control.Monad.Catch import Data.Heap import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import Data.Foldable (toList) +import Quasar.Async import Quasar.Awaitable import Quasar.Disposable import Quasar.Prelude @@ -37,6 +41,7 @@ data Timer = Timer { key :: Unique, time :: UTCTime, completed :: AsyncVar (), + disposable :: STMDisposable, scheduler :: TimerScheduler } @@ -47,33 +52,21 @@ instance Ord Timer where x `compare` y = time x `compare` time y instance IsDisposable Timer where - beginDispose = undefined - - --dispose self = liftIO do - -- atomically do - -- cancelled <- failAsyncVarSTM (completed self) TimerCancelled - -- when cancelled do - -- modifyTVar (activeCount (scheduler self)) (+ (-1)) - -- modifyTVar (cancelledCount (scheduler self)) (+ 1) - -- pure $ isDisposed self - - isDisposed = awaitSuccessOrFailure . completed - - registerFinalizer = undefined + toDisposable Timer{disposable} = toDisposable disposable instance IsAwaitable () Timer where - toAwaitable = toAwaitable . completed + toAwaitable Timer{completed} = toAwaitable completed data TimerScheduler = TimerScheduler { - heap :: TVar (Heap Timer), + heap :: TMVar (Heap Timer), activeCount :: TVar Int, cancelledCount :: TVar Int, - resourceManager :: ResourceManager + disposable :: Disposable } instance IsDisposable TimerScheduler where - toDisposable = toDisposable . resourceManager + toDisposable TimerScheduler{disposable} = disposable data TimerSchedulerDisposed = TimerSchedulerDisposed deriving stock (Eq, Show) @@ -81,30 +74,27 @@ data TimerSchedulerDisposed = TimerSchedulerDisposed instance Exception TimerSchedulerDisposed newTimerScheduler :: MonadResourceManager m => m TimerScheduler -newTimerScheduler = do - resourceManager <- newResourceManager +newTimerScheduler = registerNewResource newUnmanagedTimerScheduler + +newUnmanagedTimerScheduler :: MonadIO m => m TimerScheduler +newUnmanagedTimerScheduler = do liftIO do - heap <- newTVarIO empty + heap <- newTMVarIO empty activeCount <- newTVarIO 0 cancelledCount <- newTVarIO 0 - let scheduler = TimerScheduler { - heap, - activeCount, - cancelledCount, - resourceManager - } - startSchedulerThread scheduler - pure scheduler - -startSchedulerThread :: TimerScheduler -> IO () -startSchedulerThread scheduler = do - mask_ do - onResourceManager (resourceManager scheduler) do - registerDisposable =<< unmanagedFork schedulerThread + mfix \scheduler -> do + disposable <- startSchedulerThread scheduler + pure TimerScheduler { + heap, + activeCount, + cancelledCount, + disposable + } + +startSchedulerThread :: TimerScheduler -> IO Disposable +startSchedulerThread scheduler = unmanagedFork_ (schedulerThread `finally` cancelAll) where - resourceManager' :: ResourceManager - resourceManager' = resourceManager scheduler - heap' :: TVar (Heap Timer) + heap' :: TMVar (Heap Timer) heap' = heap scheduler activeCount' = activeCount scheduler cancelledCount' = cancelledCount scheduler @@ -114,7 +104,7 @@ startSchedulerThread scheduler = do -- Get next timer (blocks when heap is empty) nextTimer <- atomically do - uncons <$> readTVar heap' >>= \case + uncons <$> readTMVar heap' >>= \case Nothing -> retry Just (timer, _) -> pure timer @@ -129,17 +119,18 @@ startSchedulerThread scheduler = do wait :: Timer -> Int -> IO () wait nextTimer microseconds = do - delay <- onResourceManager resourceManager' $ toAwaitable <$> newDelay microseconds + delay <- newUnmanagedDelay microseconds awaitAny2 delay nextTimerChanged + dispose delay where nextTimerChanged :: Awaitable () nextTimerChanged = unsafeAwaitSTM do - minTimer <- Data.Heap.minimum <$> readTVar heap' + minTimer <- Data.Heap.minimum <$> readTMVar heap' unless (minTimer /= nextTimer) retry fireTimers :: UTCTime -> IO () fireTimers now = atomically do - writeTVar heap' =<< go =<< readTVar heap' + putTMVar heap' =<< go =<< takeTMVar heap' doCleanup <- liftA2 (>) (readTVar cancelledCount') (readTVar activeCount') when doCleanup cleanup where @@ -150,13 +141,18 @@ startSchedulerThread scheduler = do Just (timer, others) -> do if (time timer) <= now then do - result <- putAsyncVarSTM (completed timer) () - modifyTVar (if result then activeCount' else cancelledCount') (+ (-1)) + fireTimer timer pure others else pure timers + fireTimer :: Timer -> STM () + fireTimer Timer{completed, disposable} = do + result <- putAsyncVarSTM completed () + modifyTVar (if result then activeCount' else cancelledCount') (+ (-1)) + disposeSTMDisposable disposable + cleanup :: STM () - cleanup = writeTVar heap' . fromList =<< mapMaybeM cleanupTimer =<< (toList <$> readTVar heap') + cleanup = putTMVar heap' . fromList =<< mapMaybeM cleanupTimer =<< (toList <$> takeTMVar heap') cleanupTimer :: Timer -> STM (Maybe Timer) cleanupTimer timer = do @@ -167,26 +163,43 @@ startSchedulerThread scheduler = do pure Nothing else pure $ Just timer + cancelAll :: IO () + cancelAll = do + timers <- atomically $ takeTMVar heap' + mapM_ dispose timers + +newTimer :: MonadResourceManager m => TimerScheduler -> UTCTime -> m Timer +newTimer scheduler time = + registerNewResource $ newUnmanagedTimer scheduler time -newTimer :: TimerScheduler -> UTCTime -> IO Timer -newTimer scheduler time = do + +newUnmanagedTimer :: MonadIO m => TimerScheduler -> UTCTime -> m Timer +newUnmanagedTimer scheduler time = liftIO do key <- newUnique completed <- newAsyncVar - let timer = Timer { key, time, completed, scheduler } + disposable <- newSTMDisposable' do + cancelled <- failAsyncVarSTM completed TimerCancelled + when cancelled do + modifyTVar (activeCount scheduler) (+ (-1)) + modifyTVar (cancelledCount scheduler) (+ 1) + let timer = Timer { key, time, completed, disposable, scheduler } atomically do - modifyTVar (heap scheduler) (insert timer) + tryTakeTMVar (heap scheduler) >>= \case + Just timers -> putTMVar (heap scheduler) (insert timer timers) + Nothing -> throwM TimerSchedulerDisposed modifyTVar (activeCount scheduler) (+ 1) pure timer -sleepUntil :: TimerScheduler -> UTCTime -> IO () -sleepUntil scheduler time = bracketOnError (newTimer scheduler time) dispose await +sleepUntil :: MonadIO m => TimerScheduler -> UTCTime -> m () +sleepUntil scheduler time = liftIO $ bracketOnError (newUnmanagedTimer scheduler time) dispose await --- | Can be awaited successfully after a given number of microseconds. Based on `threadDelay`, but provides an --- `IsAwaitable` and `IsDisposable` instance. +-- | Provides an `IsAwaitable` instance that can be awaited successfully after a given number of microseconds. +-- +-- Based on `threadDelay`. Provides a `IsAwaitable` and a `IsDisposable` instance. newtype Delay = Delay (Task ()) deriving newtype IsDisposable @@ -194,10 +207,10 @@ instance IsAwaitable () Delay where toAwaitable (Delay task) = toAwaitable task `catch` \TaskDisposed -> throwM TimerCancelled newDelay :: MonadResourceManager m => Int -> m Delay -newDelay microseconds = mask_ do - delay <- Delay <$> unmanagedFork (liftIO (threadDelay microseconds)) - registerDisposable delay - pure delay +newDelay microseconds = registerNewResource $ newUnmanagedDelay microseconds + +newUnmanagedDelay :: MonadIO m => Int -> m Delay +newUnmanagedDelay microseconds = Delay <$> unmanagedFork (liftIO (threadDelay microseconds)) -- GitLab