diff --git a/quasar.cabal b/quasar.cabal index a8f079f85255cc571b9306750e1bd5a5435807a4..6c5b8dd71b5aafb3fe968453a6ea39e7db6c18cc 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -72,11 +72,13 @@ library exceptions, ghc-prim, hashable, + heaps, microlens-platform, mtl, record-hasfield, stm, template-haskell, + time, transformers, unordered-containers, exposed-modules: @@ -89,6 +91,7 @@ library Quasar.Observable.ObservablePriority Quasar.Prelude Quasar.PreludeExtras + Quasar.Timer Quasar.Utils.ExtraT hs-source-dirs: src diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs new file mode 100644 index 0000000000000000000000000000000000000000..90ecc50722dbb6d58f06c100722bc0559932680c --- /dev/null +++ b/src/Quasar/Timer.hs @@ -0,0 +1,186 @@ +{-# LANGUAGE MultiWayIf #-} + +module Quasar.Timer ( + Timer, + newTimer, + sleepUntil, + + TimerScheduler, + newTimerScheduler, + + TimerCancelled, + NegativeTimeJump, + + Delay, + newDelay, +) where + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Monad.Catch +import Data.Heap +import Data.Ord (comparing) +import Data.Time.Clock (UTCTime, getCurrentTime, diffUTCTime, utctDayTime, addUTCTime) +import Data.Time.Clock.POSIX (posixSecondsToUTCTime) +import Quasar.Async +import Quasar.Awaitable +import Quasar.Disposable +import Quasar.Prelude + + +data TimerCancelled = TimerCancelled + deriving stock (Eq, Show) + +instance Exception TimerCancelled + + +data NegativeTimeJump = NegativeTimeJump + deriving stock (Eq, Show) + +instance Exception NegativeTimeJump + + + +data Timer = Timer { + key :: Unique, + time :: UTCTime, + completed :: AsyncVar (), + scheduler :: TimerScheduler +} + +instance Eq Timer where + x == y = key x == key y + +instance Ord Timer where + x `compare` y = time x `compare` time y + +instance IsDisposable Timer where + dispose self = do + atomically do + cancelled <- failAsyncVarSTM (completed self) TimerCancelled + when cancelled do + modifyTVar (activeCount (scheduler self)) (+ (-1)) + modifyTVar (cancelledCount (scheduler self)) (+ 1) + pure $ isDisposed self + + isDisposed = awaitSuccessOrFailure . completed + +instance IsAwaitable () Timer where + toAwaitable = toAwaitable . completed + + +data TimerScheduler = TimerScheduler { + heap :: TVar (Heap Timer), + activeCount :: TVar Int, + cancelledCount :: TVar Int, + resourceManager :: ResourceManager +} + +instance IsDisposable TimerScheduler where + toDisposable = toDisposable . resourceManager + +data TimerSchedulerDisposed = TimerSchedulerDisposed + deriving stock (Eq, Show) + +instance Exception TimerSchedulerDisposed + +newTimerScheduler :: ResourceManager -> IO TimerScheduler +newTimerScheduler parentResourceManager = do + heap <- newTVarIO empty + activeCount <- newTVarIO 0 + cancelledCount <- newTVarIO 0 + resourceManager <- newResourceManager parentResourceManager + let scheduler = TimerScheduler { + heap, + activeCount, + cancelledCount, + resourceManager + } + startSchedulerThread scheduler + pure scheduler + +startSchedulerThread :: TimerScheduler -> IO () +startSchedulerThread scheduler = do + mask_ do + threadId <- forkIOWithUnmask ($ schedulerThread) + attachDisposeAction_ (resourceManager scheduler) do + throwTo threadId TimerSchedulerDisposed + pure $ pure () + where + resourceManager' :: ResourceManager + resourceManager' = resourceManager scheduler + heap' :: TVar (Heap Timer) + heap' = heap scheduler + + schedulerThread :: IO () + schedulerThread = forever do + + -- Get next timer (blocks when heap is empty) + nextTimer <- atomically do + uncons <$> readTVar heap' >>= \case + Nothing -> retry + Just (timer, _) -> pure timer + + now <- getCurrentTime + + -- TODO sleep using Posix/Linux create_timer using CLOCK_REALTIME + let timeUntil = diffUTCTime (time nextTimer) now + if + | timeUntil <= 0 -> fireTimers now + | timeUntil < 60 -> wait nextTimer (ceiling $ toRational timeUntil * 1000000) + | otherwise -> wait nextTimer (60 * 1000000) + + wait :: Timer -> Int -> IO () + wait nextTimer microseconds = do + delay <- toAwaitable <$> newDelay resourceManager' microseconds + await $ awaitAny2 delay nextTimerChanged + where + nextTimerChanged :: Awaitable () + nextTimerChanged = unsafeAwaitSTM do + min <- Data.Heap.minimum <$> readTVar heap' + unless (min /= nextTimer) retry + + fireTimers :: UTCTime -> IO () + fireTimers now = atomically do + writeTVar heap' =<< go =<< readTVar heap' + where + go :: Heap Timer -> STM (Heap Timer) + go timers = do + case uncons timers of + Nothing -> pure timers + Just (timer, others) -> do + if (time timer) <= now + then do + result <- putAsyncVarSTM (completed timer) () + modifyTVar ((if result then activeCount else cancelledCount) scheduler) (+ (-1)) + pure others + else pure timers + + +newTimer :: TimerScheduler -> UTCTime -> IO Timer +newTimer scheduler time = do + key <- newUnique + completed <- newAsyncVar + let timer = Timer { key, time, completed, scheduler } + atomically do + modifyTVar (heap scheduler) (insert timer) + modifyTVar (activeCount scheduler) (+ 1) + pure timer + + +sleepUntil :: TimerScheduler -> UTCTime -> IO () +sleepUntil scheduler time = bracketOnError (newTimer scheduler time) disposeIO await + + + + +-- | Can be awaited successfully after a given number of microseconds. Based on `threadDelay`, but provides an +-- `IsAwaitable` and `IsDisposable` instance. +newtype Delay = Delay (Task ()) + deriving newtype IsDisposable + +instance IsAwaitable () Delay where + toAwaitable (Delay task) = toAwaitable task `catch` \TaskDisposed -> throwM TimerCancelled + +newDelay :: ResourceManager -> Int -> IO Delay +newDelay resourceManager microseconds = onResourceManager resourceManager $ Delay <$> async (liftIO (threadDelay microseconds))