Newer
Older
{-# LANGUAGE MultiWayIf #-}
module Quasar.Timer (
Timer,
newTimer,
sleepUntil,
TimerScheduler,
newTimerScheduler,
TimerCancelled,
Delay,
newDelay,
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Catch
import Data.Heap
import Quasar.Exceptions
import Quasar.Monad
data TimerCancelled = TimerCancelled
deriving stock (Eq, Show)
instance Exception TimerCancelled
data Timer = Timer {
key :: Unique,
time :: UTCTime,
completed :: AsyncVar (),
scheduler :: TimerScheduler
}
instance Eq Timer where
x == y = key x == key y
instance Ord Timer where
x `compare` y = time x `compare` time y
instance Resource Timer where
getDisposer Timer{disposer} = disposer
toAwaitable Timer{completed} = toAwaitable completed
heap :: TMVar (Heap Timer),
activeCount :: TVar Int,
cancelledCount :: TVar Int,
disposer :: Disposer,
ioWorker :: TIOWorker,
exceptionChannel :: ExceptionChannel
instance Resource TimerScheduler where
getDisposer TimerScheduler{disposer} = disposer
data TimerSchedulerDisposed = TimerSchedulerDisposed
deriving stock (Eq, Show)
instance Exception TimerSchedulerDisposed
newTimerScheduler :: (MonadQuasar m, MonadIO m) => m TimerScheduler
newTimerScheduler = liftQuasarIO do
heap <- liftIO $ newTMVarIO empty
activeCount <- liftIO $ newTVarIO 0
cancelledCount <- liftIO $ newTVarIO 0
ioWorker <- askIOWorker
exceptionChannel <- askExceptionChannel
mfix \scheduler -> do
disposer <- startSchedulerThread scheduler
pure TimerScheduler {
heap,
activeCount,
cancelledCount,
disposer,
ioWorker,
exceptionChannel
}
startSchedulerThread :: TimerScheduler -> QuasarIO Disposer
startSchedulerThread scheduler = getDisposer <$> async (schedulerThread `finally` liftIO cancelAll)
heap' :: TMVar (Heap Timer)
activeCount' = activeCount scheduler
cancelledCount' = cancelledCount scheduler
schedulerThread = forever do
-- Get next timer (blocks when heap is empty)
Nothing -> retry
Just (timer, _) -> pure timer
-- TODO sleep using Posix/Linux create_timer using CLOCK_REALTIME
let timeUntil = diffUTCTime (time nextTimer) now
if
| timeUntil <= 0 -> liftIO $ fireTimers now
| timeUntil < 60 -> wait nextTimer (ceiling $ toRational timeUntil * 1000000)
| otherwise -> wait nextTimer (60 * 1000000)
awaitAny2 (await delay) nextTimerChanged
where
nextTimerChanged :: Awaitable ()
nextTimerChanged = unsafeAwaitSTM do
minTimer <- Data.Heap.minimum <$> readTMVar heap'
fireTimers :: UTCTime -> IO ()
fireTimers now = atomically do
putTMVar heap' =<< go =<< takeTMVar heap'
doCleanup <- liftA2 (>) (readTVar cancelledCount') (readTVar activeCount')
when doCleanup cleanup
where
go :: Heap Timer -> STM (Heap Timer)
go timers = do
case uncons timers of
Nothing -> pure timers
Just (timer, others) -> do
fireTimer :: Timer -> STM ()
fireTimer Timer{completed, disposer} = do
result <- putAsyncVarSTM completed ()
modifyTVar (if result then activeCount' else cancelledCount') (+ (-1))
cleanup = putTMVar heap' . fromList =<< mapMaybeM cleanupTimer . toList =<< takeTMVar heap'
cleanupTimer :: Timer -> STM (Maybe Timer)
cleanupTimer timer = do
cancelled <- ((False <$ readAsyncVarSTM (completed timer)) `catch` \TimerCancelled -> pure True) `orElse` pure False
if cancelled
then do
modifyTVar cancelledCount' (+ (-1))
pure Nothing
else pure $ Just timer
cancelAll :: IO ()
cancelAll = do
timers <- atomically $ takeTMVar heap'
mapM_ dispose timers
newTimer :: (MonadQuasar m, MonadIO m) => TimerScheduler -> UTCTime -> m Timer
newTimer scheduler time = registerNewResource $ newUnmanagedTimer scheduler time
newUnmanagedTimer :: MonadIO m => TimerScheduler -> UTCTime -> m Timer
newUnmanagedTimer scheduler time = liftIO do
key <- newUnique
completed <- newAsyncVar
atomically do
disposer <- newUnmanagedSTMDisposerSTM (disposeFn completed) (ioWorker scheduler) (exceptionChannel scheduler)
let timer = Timer { key, time, completed, disposer, scheduler }
tryTakeTMVar (heap scheduler) >>= \case
Just timers -> putTMVar (heap scheduler) (insert timer timers)
Nothing -> throwM TimerSchedulerDisposed
where
disposeFn :: AsyncVar () -> STM ()
disposeFn completed = do
cancelled <- failAsyncVarSTM completed TimerCancelled
when cancelled do
modifyTVar (activeCount scheduler) (+ (-1))
modifyTVar (cancelledCount scheduler) (+ 1)
sleepUntil :: MonadIO m => TimerScheduler -> UTCTime -> m ()
sleepUntil scheduler time = liftIO $ bracketOnError (newUnmanagedTimer scheduler time) dispose await
-- | Provides an `IsAwaitable` instance that can be awaited successfully after a given number of microseconds.
--
-- Based on `threadDelay`. Provides a `IsAwaitable` and a `IsDisposable` instance.
newtype Delay = Delay (Async ())
toAwaitable (Delay task) = toAwaitable task `catch` \AsyncDisposed -> throwM TimerCancelled
newDelay :: MonadQuasar m => Int -> m Delay
newDelay microseconds = Delay <$> async (liftIO (threadDelay microseconds))
-- From package `extra`
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> [a] -> m [b]
mapMaybeM op = foldr f (pure [])
where f x xs = do y <- op x; case y of Nothing -> xs; Just z -> do ys <- xs; pure $ z:ys