From eeaa314335ffaabada38452e50d57ba849e65501 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Sun, 29 Aug 2021 01:13:46 +0200
Subject: [PATCH] Clean up cancelled timers

---
 src/Quasar/Awaitable.hs |  6 ++++++
 src/Quasar/Timer.hs     | 28 ++++++++++++++++++++++++++--
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/src/Quasar/Awaitable.hs b/src/Quasar/Awaitable.hs
index 1292986..15399c4 100644
--- a/src/Quasar/Awaitable.hs
+++ b/src/Quasar/Awaitable.hs
@@ -40,6 +40,7 @@ module Quasar.Awaitable (
   failAsyncVarSTM,
   failAsyncVarSTM_,
   putAsyncVarEitherSTM_,
+  readAsyncVarSTM,
 
   -- * Implementation helpers
   MonadQuerySTM(querySTM),
@@ -286,6 +287,11 @@ putAsyncVarEitherSTM :: AsyncVar a -> Either SomeException a -> STM Bool
 putAsyncVarEitherSTM (AsyncVar var) = tryPutTMVar var
 
 
+-- | Get the value of an `AsyncVar` in `STM`. Will retry until the AsyncVar is fulfilled.
+readAsyncVarSTM :: AsyncVar a -> STM a
+readAsyncVarSTM (AsyncVar var) = either throwM pure =<< readTMVar var
+
+
 putAsyncVar :: MonadIO m => AsyncVar a -> a -> m Bool
 putAsyncVar var = putAsyncVarEither var . Right
 
diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs
index 90ecc50..ad20f2f 100644
--- a/src/Quasar/Timer.hs
+++ b/src/Quasar/Timer.hs
@@ -22,6 +22,7 @@ import Data.Heap
 import Data.Ord (comparing)
 import Data.Time.Clock (UTCTime, getCurrentTime, diffUTCTime, utctDayTime, addUTCTime)
 import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
+import Data.Foldable (toList)
 import Quasar.Async
 import Quasar.Awaitable
 import Quasar.Disposable
@@ -111,6 +112,8 @@ startSchedulerThread scheduler = do
     resourceManager' = resourceManager scheduler
     heap' :: TVar (Heap Timer)
     heap' = heap scheduler
+    activeCount' = activeCount scheduler
+    cancelledCount' = cancelledCount scheduler
 
     schedulerThread :: IO ()
     schedulerThread = forever do
@@ -143,6 +146,8 @@ startSchedulerThread scheduler = do
     fireTimers :: UTCTime -> IO ()
     fireTimers now = atomically do
       writeTVar heap' =<< go =<< readTVar heap'
+      doCleanup <- liftA2 (>) (readTVar cancelledCount') (readTVar activeCount')
+      when doCleanup cleanup
       where
         go :: Heap Timer -> STM (Heap Timer)
         go timers = do
@@ -152,10 +157,23 @@ startSchedulerThread scheduler = do
               if (time timer) <= now
                 then do
                   result <- putAsyncVarSTM (completed timer) ()
-                  modifyTVar ((if result then activeCount else cancelledCount) scheduler) (+ (-1))
+                  modifyTVar (if result then activeCount' else cancelledCount') (+ (-1))
                   pure others
                  else pure timers
 
+    cleanup :: STM ()
+    cleanup = writeTVar heap' . fromList =<< mapMaybeM cleanupTimer =<< (toList <$> readTVar heap')
+
+    cleanupTimer :: Timer -> STM (Maybe Timer)
+    cleanupTimer timer = do
+      cancelled <- ((False <$ readAsyncVarSTM (completed timer)) `catch` \TimerCancelled -> pure True) `orElse` pure False
+      if cancelled
+        then do
+          modifyTVar cancelledCount' (+ (-1))
+          pure Nothing
+        else pure $ Just timer
+
+
 
 newTimer :: TimerScheduler -> UTCTime -> IO Timer
 newTimer scheduler time = do
@@ -173,7 +191,6 @@ sleepUntil scheduler time = bracketOnError (newTimer scheduler time) disposeIO a
 
 
 
-
 -- | Can be awaited successfully after a given number of microseconds. Based on `threadDelay`, but provides an
 -- `IsAwaitable` and `IsDisposable` instance.
 newtype Delay = Delay (Task ())
@@ -184,3 +201,10 @@ instance IsAwaitable () Delay where
 
 newDelay :: ResourceManager -> Int -> IO Delay
 newDelay resourceManager microseconds = onResourceManager resourceManager $ Delay <$> async (liftIO (threadDelay microseconds))
+
+
+
+-- From package `extra`
+mapMaybeM :: Monad m => (a -> m (Maybe b)) -> [a] -> m [b]
+mapMaybeM op = foldr f (pure [])
+    where f x xs = do x <- op x; case x of Nothing -> xs; Just x -> do xs <- xs; pure $ x:xs
-- 
GitLab