Skip to content
Snippets Groups Projects
Timer.hs 6.5 KiB
Newer Older
Jens Nolte's avatar
Jens Nolte committed
{-# LANGUAGE MultiWayIf #-}

module Quasar.Timer (
  Timer,
  newTimer,
Jens Nolte's avatar
Jens Nolte committed
  sleepUntil,

  TimerScheduler,
  newTimerScheduler,

  TimerCancelled,

  Delay,
  newDelay,
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Catch
import Data.Heap
Jens Nolte's avatar
Jens Nolte committed
import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime)
Jens Nolte's avatar
Jens Nolte committed
import Data.Foldable (toList)
import Quasar.Async.STMHelper
import Quasar.Async
Jens Nolte's avatar
Jens Nolte committed
import Quasar.Awaitable
import Quasar.Exceptions
import Quasar.Monad
Jens Nolte's avatar
Jens Nolte committed
import Quasar.Prelude
import Quasar.Resources
Jens Nolte's avatar
Jens Nolte committed


data TimerCancelled = TimerCancelled
  deriving stock (Eq, Show)

instance Exception TimerCancelled


data Timer = Timer {
  key :: Unique,
  time :: UTCTime,
  completed :: AsyncVar (),
  disposer :: Disposer,
Jens Nolte's avatar
Jens Nolte committed
  scheduler :: TimerScheduler
}

instance Eq Timer where
  x == y = key x == key y

instance Ord Timer where
  x `compare` y = time x `compare` time y

instance Resource Timer where
  getDisposer Timer{disposer} = disposer
Jens Nolte's avatar
Jens Nolte committed
instance IsAwaitable () Timer where
  toAwaitable Timer{completed} = toAwaitable completed
Jens Nolte's avatar
Jens Nolte committed


data TimerScheduler = TimerScheduler {
  heap :: TMVar (Heap Timer),
Jens Nolte's avatar
Jens Nolte committed
  activeCount :: TVar Int,
  cancelledCount :: TVar Int,
  disposer :: Disposer,
  ioWorker :: TIOWorker,
  exceptionChannel :: ExceptionChannel
instance Resource TimerScheduler where
  getDisposer TimerScheduler{disposer} = disposer
Jens Nolte's avatar
Jens Nolte committed

data TimerSchedulerDisposed = TimerSchedulerDisposed
  deriving stock (Eq, Show)

instance Exception TimerSchedulerDisposed

newTimerScheduler :: (MonadQuasar m, MonadIO m) => m TimerScheduler
newTimerScheduler = liftQuasarIO do
  heap <- liftIO $ newTMVarIO empty
  activeCount <- liftIO $ newTVarIO 0
  cancelledCount <- liftIO $ newTVarIO 0
  ioWorker <- askIOWorker
  exceptionChannel <- askExceptionChannel
  mfix \scheduler -> do
    disposer <- startSchedulerThread scheduler
    pure TimerScheduler {
      heap,
      activeCount,
      cancelledCount,
      disposer,
      ioWorker,
      exceptionChannel
    }

startSchedulerThread :: TimerScheduler -> QuasarIO Disposer
startSchedulerThread scheduler = getDisposer <$> async (schedulerThread `finally` liftIO cancelAll)
Jens Nolte's avatar
Jens Nolte committed
  where
    heap' :: TMVar (Heap Timer)
Jens Nolte's avatar
Jens Nolte committed
    heap' = heap scheduler
Jens Nolte's avatar
Jens Nolte committed
    activeCount' = activeCount scheduler
    cancelledCount' = cancelledCount scheduler
    schedulerThread :: QuasarIO ()
Jens Nolte's avatar
Jens Nolte committed
    schedulerThread = forever do

      -- Get next timer (blocks when heap is empty)
      nextTimer <- liftIO $ atomically do
Jens Nolte's avatar
Jens Nolte committed
        mNext <- uncons <$> readTMVar heap'
        case mNext of
Jens Nolte's avatar
Jens Nolte committed
          Nothing -> retry
          Just (timer, _) -> pure timer

      now <- liftIO getCurrentTime
Jens Nolte's avatar
Jens Nolte committed

      -- TODO sleep using Posix/Linux create_timer using CLOCK_REALTIME
      let timeUntil = diffUTCTime (time nextTimer) now
      if
        | timeUntil <= 0 -> liftIO $ fireTimers now
Jens Nolte's avatar
Jens Nolte committed
        | timeUntil < 60 -> wait nextTimer (ceiling $ toRational timeUntil * 1000000)
        | otherwise -> wait nextTimer (60 * 1000000)

    wait :: Timer -> Int -> QuasarIO ()
Jens Nolte's avatar
Jens Nolte committed
    wait nextTimer microseconds = do
      delay <- newDelay microseconds
      awaitAny2 (await delay) nextTimerChanged
Jens Nolte's avatar
Jens Nolte committed
      where
        nextTimerChanged :: Awaitable ()
        nextTimerChanged = unsafeAwaitSTM do
          minTimer <- Data.Heap.minimum <$> readTMVar heap'
Jens Nolte's avatar
Jens Nolte committed
          unless (minTimer /= nextTimer) retry
Jens Nolte's avatar
Jens Nolte committed

    fireTimers :: UTCTime -> IO ()
    fireTimers now = atomically do
      putTMVar heap' =<< go =<< takeTMVar heap'
Jens Nolte's avatar
Jens Nolte committed
      doCleanup <- liftA2 (>) (readTVar cancelledCount') (readTVar activeCount')
      when doCleanup cleanup
Jens Nolte's avatar
Jens Nolte committed
      where
        go :: Heap Timer -> STM (Heap Timer)
        go timers = do
          case uncons timers of
            Nothing -> pure timers
            Just (timer, others) -> do
              if time timer <= now
Jens Nolte's avatar
Jens Nolte committed
                then do
Jens Nolte's avatar
Jens Nolte committed
                  pure others
                 else pure timers

    fireTimer :: Timer -> STM ()
    fireTimer Timer{completed, disposer} = do
      result <- putAsyncVarSTM completed ()
      modifyTVar (if result then activeCount' else cancelledCount') (+ (-1))
      disposeEventuallySTM_ disposer
Jens Nolte's avatar
Jens Nolte committed
    cleanup :: STM ()
Jens Nolte's avatar
Jens Nolte committed
    cleanup = putTMVar heap' . fromList =<< mapMaybeM cleanupTimer . toList =<< takeTMVar heap'
Jens Nolte's avatar
Jens Nolte committed

    cleanupTimer :: Timer -> STM (Maybe Timer)
    cleanupTimer timer = do
      cancelled <- ((False <$ readAsyncVarSTM (completed timer)) `catch` \TimerCancelled -> pure True) `orElse` pure False
      if cancelled
        then do
          modifyTVar cancelledCount' (+ (-1))
          pure Nothing
        else pure $ Just timer

    cancelAll :: IO ()
    cancelAll = do
      timers <- atomically $ takeTMVar heap'
      mapM_ dispose timers

newTimer :: (MonadQuasar m, MonadIO m) => TimerScheduler -> UTCTime -> m Timer
newTimer scheduler time = registerNewResource $ newUnmanagedTimer scheduler time

newUnmanagedTimer :: MonadIO m => TimerScheduler -> UTCTime -> m Timer
newUnmanagedTimer scheduler time = liftIO do
Jens Nolte's avatar
Jens Nolte committed
  key <- newUnique
  completed <- newAsyncVar
  atomically do
    disposer <- newUnmanagedSTMDisposerSTM (disposeFn completed) (ioWorker scheduler) (exceptionChannel scheduler)
    let timer = Timer { key, time, completed, disposer, scheduler }
    tryTakeTMVar (heap scheduler) >>= \case
      Just timers -> putTMVar (heap scheduler) (insert timer timers)
      Nothing -> throwM TimerSchedulerDisposed
Jens Nolte's avatar
Jens Nolte committed
    modifyTVar (activeCount scheduler) (+ 1)
  where
    disposeFn :: AsyncVar () -> STM ()
    disposeFn completed = do
      cancelled <- failAsyncVarSTM completed TimerCancelled
      when cancelled do
        modifyTVar (activeCount scheduler) (+ (-1))
        modifyTVar (cancelledCount scheduler) (+ 1)
sleepUntil :: MonadIO m => TimerScheduler -> UTCTime -> m ()
sleepUntil scheduler time = liftIO $ bracketOnError (newUnmanagedTimer scheduler time) dispose await
-- | Provides an `IsAwaitable` instance that can be awaited successfully after a given number of microseconds.
--
-- Based on `threadDelay`. Provides a `IsAwaitable` and a `IsDisposable` instance.
newtype Delay = Delay (Async ())
  deriving newtype Resource
Jens Nolte's avatar
Jens Nolte committed

instance IsAwaitable () Delay where
  toAwaitable (Delay task) = toAwaitable task `catch` \AsyncDisposed -> throwM TimerCancelled
newDelay :: MonadQuasar m => Int -> m Delay
newDelay microseconds = Delay <$> async (liftIO (threadDelay microseconds))
Jens Nolte's avatar
Jens Nolte committed


-- From package `extra`
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> [a] -> m [b]
mapMaybeM op = foldr f (pure [])
Jens Nolte's avatar
Jens Nolte committed
    where f x xs = do y <- op x; case y of Nothing -> xs; Just z -> do ys <- xs; pure $ z:ys