diff --git a/src/Quasar/Async/STMHelper.hs b/src/Quasar/Async/STMHelper.hs index 11dccad71cc6fff546c4cebb5f4a333c7c814c1f..d541072d2b86a8dd3fa77d22dcbd362c1189861a 100644 --- a/src/Quasar/Async/STMHelper.hs +++ b/src/Quasar/Async/STMHelper.hs @@ -14,13 +14,13 @@ import Quasar.Exceptions import Quasar.Prelude -newtype TIOWorker = TIOWorker (TMVar (IO ())) +newtype TIOWorker = TIOWorker (TQueue (IO ())) startTrivialIO :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM (Awaitable a) -startTrivialIO (TIOWorker jobVar) exChan fn = do +startTrivialIO (TIOWorker jobQueue) exChan fn = do resultVar <- newAsyncVarSTM - putTMVar jobVar $ job resultVar + writeTQueue jobQueue $ job resultVar pure $ toAwaitable resultVar where job :: AsyncVar a -> IO () @@ -37,11 +37,11 @@ startTrivialIO_ x y z = void $ startTrivialIO x y z newTIOWorker :: IO TIOWorker newTIOWorker = do - jobVar <- newEmptyTMVarIO + jobQueue <- newTQueueIO void $ forkIO $ - handle + catch + (forever $ join $ atomically $ readTQueue jobQueue) -- Relies on garbage collection to remove the thread when it is no longer needed (\(_ :: BlockedIndefinitelyOnSTM) -> pure ()) - (forever $ join $ atomically $ takeTMVar jobVar) - pure $ TIOWorker jobVar + pure $ TIOWorker jobQueue