From f26555180b1233444e10c249bc44ae0fc77dfb1a Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sat, 12 Feb 2022 00:57:26 +0100 Subject: [PATCH] Switch Timer module to use MonadQuasar --- src/Quasar/Timer.hs | 101 +++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 53 deletions(-) diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs index b426f7c..0c344e2 100644 --- a/src/Quasar/Timer.hs +++ b/src/Quasar/Timer.hs @@ -8,13 +8,11 @@ module Quasar.Timer ( TimerScheduler, newTimerScheduler, - newUnmanagedTimerScheduler, TimerCancelled, Delay, newDelay, - newUnmanagedDelay, ) where import Control.Concurrent @@ -23,12 +21,13 @@ import Control.Monad.Catch import Data.Heap import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import Data.Foldable (toList) -import Quasar.Async -import Quasar.Async.Unmanaged +import Quasar.Async.STMHelper +import Quasar.Async.V2 import Quasar.Awaitable -import Quasar.Disposable +import Quasar.Exceptions +import Quasar.Monad import Quasar.Prelude -import Quasar.ResourceManager +import Quasar.Resources data TimerCancelled = TimerCancelled @@ -41,7 +40,7 @@ data Timer = Timer { key :: Unique, time :: UTCTime, completed :: AsyncVar (), - disposable :: STMDisposable, + disposer :: Disposer, scheduler :: TimerScheduler } @@ -51,8 +50,8 @@ instance Eq Timer where instance Ord Timer where x `compare` y = time x `compare` time y -instance IsDisposable Timer where - toDisposable Timer{disposable} = toDisposable disposable +instance Resource Timer where + getDisposer Timer{disposer} = disposer instance IsAwaitable () Timer where toAwaitable Timer{completed} = toAwaitable completed @@ -62,65 +61,67 @@ data TimerScheduler = TimerScheduler { heap :: TMVar (Heap Timer), activeCount :: TVar Int, cancelledCount :: TVar Int, - disposable :: Disposable + disposer :: Disposer, + ioWorker :: TIOWorker, + exceptionChannel :: ExceptionChannel } -instance IsDisposable TimerScheduler where - toDisposable TimerScheduler{disposable} = disposable +instance Resource TimerScheduler where + getDisposer TimerScheduler{disposer} = disposer data TimerSchedulerDisposed = TimerSchedulerDisposed deriving stock (Eq, Show) instance Exception TimerSchedulerDisposed -newTimerScheduler :: (MonadResourceManager m, MonadIO m) => m TimerScheduler -newTimerScheduler = registerNewResource newUnmanagedTimerScheduler - -newUnmanagedTimerScheduler :: MonadIO m => m TimerScheduler -newUnmanagedTimerScheduler = do - liftIO do - heap <- newTMVarIO empty - activeCount <- newTVarIO 0 - cancelledCount <- newTVarIO 0 - mfix \scheduler -> do - disposable <- startSchedulerThread scheduler - pure TimerScheduler { - heap, - activeCount, - cancelledCount, - disposable - } - -startSchedulerThread :: TimerScheduler -> IO Disposable -startSchedulerThread scheduler = toDisposable <$> unmanagedAsync (schedulerThread `finally` cancelAll) +newTimerScheduler :: (MonadQuasar m, MonadIO m) => m TimerScheduler +newTimerScheduler = liftQuasarIO do + heap <- liftIO $ newTMVarIO empty + activeCount <- liftIO $ newTVarIO 0 + cancelledCount <- liftIO $ newTVarIO 0 + ioWorker <- askIOWorker + exceptionChannel <- askExceptionChannel + mfix \scheduler -> do + disposer <- startSchedulerThread scheduler + pure TimerScheduler { + heap, + activeCount, + cancelledCount, + disposer, + ioWorker, + exceptionChannel + } + +startSchedulerThread :: TimerScheduler -> QuasarIO Disposer +startSchedulerThread scheduler = getDisposer <$> async (schedulerThread `finally` liftIO cancelAll) where heap' :: TMVar (Heap Timer) heap' = heap scheduler activeCount' = activeCount scheduler cancelledCount' = cancelledCount scheduler - schedulerThread :: IO () + schedulerThread :: QuasarIO () schedulerThread = forever do -- Get next timer (blocks when heap is empty) - nextTimer <- atomically do + nextTimer <- liftIO $ atomically do mNext <- uncons <$> readTMVar heap' case mNext of Nothing -> retry Just (timer, _) -> pure timer - now <- getCurrentTime + now <- liftIO getCurrentTime -- TODO sleep using Posix/Linux create_timer using CLOCK_REALTIME let timeUntil = diffUTCTime (time nextTimer) now if - | timeUntil <= 0 -> fireTimers now + | timeUntil <= 0 -> liftIO $ fireTimers now | timeUntil < 60 -> wait nextTimer (ceiling $ toRational timeUntil * 1000000) | otherwise -> wait nextTimer (60 * 1000000) - wait :: Timer -> Int -> IO () + wait :: Timer -> Int -> QuasarIO () wait nextTimer microseconds = do - delay <- newUnmanagedDelay microseconds + delay <- newDelay microseconds awaitAny2 (await delay) nextTimerChanged dispose delay where @@ -147,10 +148,10 @@ startSchedulerThread scheduler = toDisposable <$> unmanagedAsync (schedulerThrea else pure timers fireTimer :: Timer -> STM () - fireTimer Timer{completed, disposable} = do + fireTimer Timer{completed, disposer} = do result <- putAsyncVarSTM completed () modifyTVar (if result then activeCount' else cancelledCount') (+ (-1)) - disposeSTMDisposable disposable + disposeEventuallySTM_ disposer cleanup :: STM () cleanup = putTMVar heap' . fromList =<< mapMaybeM cleanupTimer . toList =<< takeTMVar heap' @@ -170,9 +171,8 @@ startSchedulerThread scheduler = toDisposable <$> unmanagedAsync (schedulerThrea mapM_ dispose timers -newTimer :: (MonadResourceManager m, MonadIO m) => TimerScheduler -> UTCTime -> m Timer -newTimer scheduler time = - registerNewResource $ newUnmanagedTimer scheduler time +newTimer :: (MonadQuasar m, MonadIO m) => TimerScheduler -> UTCTime -> m Timer +newTimer scheduler time = registerNewResource $ newUnmanagedTimer scheduler time newUnmanagedTimer :: MonadIO m => TimerScheduler -> UTCTime -> m Timer @@ -180,12 +180,12 @@ newUnmanagedTimer scheduler time = liftIO do key <- newUnique completed <- newAsyncVar atomically do - disposable <- newSTMDisposable' do + disposer <- newSTMDisposer (ioWorker scheduler) (exceptionChannel scheduler) do cancelled <- failAsyncVarSTM completed TimerCancelled when cancelled do modifyTVar (activeCount scheduler) (+ (-1)) modifyTVar (cancelledCount scheduler) (+ 1) - let timer = Timer { key, time, completed, disposable, scheduler } + let timer = Timer { key, time, completed, disposer, scheduler } tryTakeTMVar (heap scheduler) >>= \case Just timers -> putTMVar (heap scheduler) (insert timer timers) Nothing -> throwM TimerSchedulerDisposed @@ -197,22 +197,17 @@ sleepUntil :: MonadIO m => TimerScheduler -> UTCTime -> m () sleepUntil scheduler time = liftIO $ bracketOnError (newUnmanagedTimer scheduler time) dispose await - -- | Provides an `IsAwaitable` instance that can be awaited successfully after a given number of microseconds. -- -- Based on `threadDelay`. Provides a `IsAwaitable` and a `IsDisposable` instance. newtype Delay = Delay (Async ()) - deriving newtype IsDisposable + deriving newtype Resource instance IsAwaitable () Delay where toAwaitable (Delay task) = toAwaitable task `catch` \AsyncDisposed -> throwM TimerCancelled -newDelay :: (MonadResourceManager m, MonadIO m) => Int -> m Delay -newDelay microseconds = registerNewResource $ newUnmanagedDelay microseconds - -newUnmanagedDelay :: MonadIO m => Int -> m Delay -newUnmanagedDelay microseconds = Delay <$> unmanagedAsync (liftIO (threadDelay microseconds)) - +newDelay :: MonadQuasar m => Int -> m Delay +newDelay microseconds = Delay <$> async (liftIO (threadDelay microseconds)) -- From package `extra` -- GitLab