diff --git a/quasar.cabal b/quasar.cabal index ca9695bfddbb26dd15b45c61dc28529acb14f5c2..afea5a0e1a76e732d6da971e40ea3906dfb67b27 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -87,6 +87,7 @@ library Quasar Quasar.Async Quasar.Async.Unmanaged + Quasar.Async.STMHelper Quasar.Awaitable Quasar.Disposable Quasar.Exceptions diff --git a/src/Quasar/Async/STMHelper.hs b/src/Quasar/Async/STMHelper.hs new file mode 100644 index 0000000000000000000000000000000000000000..11dccad71cc6fff546c4cebb5f4a333c7c814c1f --- /dev/null +++ b/src/Quasar/Async/STMHelper.hs @@ -0,0 +1,47 @@ +module Quasar.Async.STMHelper ( + TIOWorker, + newTIOWorker, + startTrivialIO, + startTrivialIO_, +) where + +import Control.Concurrent (forkIO) +import Control.Concurrent.STM +import Control.Exception (BlockedIndefinitelyOnSTM) +import Control.Monad.Catch +import Quasar.Awaitable +import Quasar.Exceptions +import Quasar.Prelude + + +newtype TIOWorker = TIOWorker (TMVar (IO ())) + + +startTrivialIO :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM (Awaitable a) +startTrivialIO (TIOWorker jobVar) exChan fn = do + resultVar <- newAsyncVarSTM + putTMVar jobVar $ job resultVar + pure $ toAwaitable resultVar + where + job :: AsyncVar a -> IO () + job resultVar = do + try fn >>= \case + Left ex -> do + atomically $ throwToExceptionChannel exChan ex + failAsyncVar_ resultVar $ toException $ AsyncException ex + Right result -> putAsyncVar_ resultVar result + +startTrivialIO_ :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM () +startTrivialIO_ x y z = void $ startTrivialIO x y z + + +newTIOWorker :: IO TIOWorker +newTIOWorker = do + jobVar <- newEmptyTMVarIO + void $ forkIO $ + handle + -- Relies on garbage collection to remove the thread when it is no longer needed + (\(_ :: BlockedIndefinitelyOnSTM) -> pure ()) + (forever $ join $ atomically $ takeTMVar jobVar) + + pure $ TIOWorker jobVar