diff --git a/quasar.cabal b/quasar.cabal index e0c9da8fda0f6cfea2faa5c847b6f5c98fea961d..c6e21de676c9dda422441b42b5cc5b7a69669afa 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -95,6 +95,7 @@ library Quasar.ResourceManager Quasar.Subscribable Quasar.Timer + Quasar.Utils.Concurrent Quasar.Utils.ExtraT hs-source-dirs: src diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index ef1f2f238b9271d8be1b9ea64817a2234c442aef..4a4976ba235efd2ef8e95f76698c9450d92a4bfe 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -11,22 +11,13 @@ module Quasar.Async ( IsAsyncContext(..), AsyncContext, unlimitedAsyncContext, - - -- * Unmanaged forking - forkTask, - forkTask_, - forkTaskWithUnmask, - forkTaskWithUnmask_, ) where -import Control.Concurrent (ThreadId, forkIOWithUnmask, throwTo) -import Control.Concurrent.STM -import Control.Monad.Catch import Control.Monad.Reader import Quasar.Awaitable -import Quasar.Disposable import Quasar.Prelude import Quasar.ResourceManager +import Quasar.Utils.Concurrent @@ -55,7 +46,7 @@ instance IsAsyncContext UnlimitedAsyncContext where resourceManager <- askResourceManager let asyncContext = unlimitedAsyncContext toAwaitable <$> registerNewResource do - forkTaskWithUnmask (\unmask -> runReaderT (runReaderT (action (liftUnmask unmask)) asyncContext) resourceManager) + unmanagedForkWithUnmask (\unmask -> runReaderT (runReaderT (action (liftUnmask unmask)) asyncContext) resourceManager) where liftUnmask :: (forall b. IO b -> IO b) -> ReaderT AsyncContext (ReaderT ResourceManager IO) a -> ReaderT AsyncContext (ReaderT ResourceManager IO) a liftUnmask unmask innerAction = do @@ -108,51 +99,3 @@ asyncWithUnmask_ action = void $ asyncWithUnmask action runUnlimitedAsync :: ReaderT AsyncContext m a -> m a runUnlimitedAsync action = do runReaderT action unlimitedAsyncContext - - - -forkTask :: MonadIO m => IO a -> m (Task a) -forkTask action = forkTaskWithUnmask \unmask -> unmask action - -forkTask_ :: MonadIO m => IO () -> m Disposable -forkTask_ action = toDisposable <$> forkTask action - -forkTaskWithUnmask :: MonadIO m => ((forall b. IO b -> IO b) -> IO a) -> m (Task a) -forkTaskWithUnmask action = do - liftIO $ mask_ do - resultVar <- newAsyncVar - threadIdVar <- newEmptyTMVarIO - - disposable <- newDisposable $ disposeTask threadIdVar resultVar - - onException - do - atomically . putTMVar threadIdVar . Just =<< - forkIOWithUnmask \unmask -> do - result <- try $ catch - do action unmask - \CancelTask -> throwIO TaskDisposed - - putAsyncVarEither_ resultVar result - - -- Thread has completed work, "disarm" the disposable and fire it - void $ atomically $ swapTMVar threadIdVar Nothing - disposeAndAwait disposable - - do atomically $ putTMVar threadIdVar Nothing - - pure $ Task disposable (toAwaitable resultVar) - where - disposeTask :: TMVar (Maybe ThreadId) -> AsyncVar r -> IO (Awaitable ()) - disposeTask threadIdVar resultVar = mask_ do - -- Blocks until the thread is forked - atomically (swapTMVar threadIdVar Nothing) >>= \case - -- Thread completed or initialization failed - Nothing -> pure () - Just threadId -> throwTo threadId CancelTask - - -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang. - pure $ void (toAwaitable resultVar) `catchAll` const (pure ()) - -forkTaskWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable -forkTaskWithUnmask_ action = toDisposable <$> forkTaskWithUnmask action diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs index 4d32cfaa337d17fa0b2d647fe71b9c3ad22c52c6..e43934a01274c6dd58d8612d61e9aa1fa869e739 100644 --- a/src/Quasar/Timer.hs +++ b/src/Quasar/Timer.hs @@ -20,11 +20,11 @@ import Control.Monad.Catch import Data.Heap import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import Data.Foldable (toList) -import Quasar.Async import Quasar.Awaitable import Quasar.Disposable import Quasar.Prelude import Quasar.ResourceManager +import Quasar.Utils.Concurrent data TimerCancelled = TimerCancelled @@ -96,7 +96,7 @@ startSchedulerThread :: TimerScheduler -> IO () startSchedulerThread scheduler = do mask_ do onResourceManager (resourceManager scheduler) do - registerDisposable =<< forkTask schedulerThread + registerDisposable =<< unmanagedFork schedulerThread where resourceManager' :: ResourceManager resourceManager' = resourceManager scheduler @@ -191,7 +191,7 @@ instance IsAwaitable () Delay where newDelay :: MonadResourceManager m => Int -> m Delay newDelay microseconds = mask_ do - delay <- Delay <$> forkTask (liftIO (threadDelay microseconds)) + delay <- Delay <$> unmanagedFork (liftIO (threadDelay microseconds)) registerDisposable delay pure delay diff --git a/src/Quasar/Utils/Concurrent.hs b/src/Quasar/Utils/Concurrent.hs new file mode 100644 index 0000000000000000000000000000000000000000..1d143fdafe6f92da5924a5694f96205596b628cf --- /dev/null +++ b/src/Quasar/Utils/Concurrent.hs @@ -0,0 +1,64 @@ +module Quasar.Utils.Concurrent ( + unmanagedFork, + unmanagedFork_, + unmanagedForkWithUnmask, + unmanagedForkWithUnmask_, +)where + + +import Control.Concurrent (ThreadId, forkIOWithUnmask, throwTo) +import Control.Concurrent.STM +import Control.Monad.Catch +import Quasar.Awaitable +import Quasar.Disposable +import Quasar.Prelude + + +unmanagedFork :: MonadIO m => IO a -> m (Task a) +unmanagedFork action = unmanagedForkWithUnmask \unmask -> unmask action + +unmanagedFork_ :: MonadIO m => IO () -> m Disposable +unmanagedFork_ action = toDisposable <$> unmanagedFork action + +unmanagedForkWithUnmask :: MonadIO m => ((forall b. IO b -> IO b) -> IO a) -> m (Task a) +unmanagedForkWithUnmask action = do + liftIO $ mask_ do + resultVar <- newAsyncVar + threadIdVar <- newEmptyTMVarIO + + disposable <- newDisposable $ disposeTask threadIdVar resultVar + + onException + do + atomically . putTMVar threadIdVar . Just =<< + forkIOWithUnmask \unmask -> do + result <- try $ catch + do action unmask + \CancelTask -> throwIO TaskDisposed + + putAsyncVarEither_ resultVar result + + -- The `action` has completed its work. + -- "disarm" the disposer thread ... + void $ atomically $ swapTMVar threadIdVar Nothing + -- .. then fire the disposable to release resources (the disposer thread) and to signal that this thread is + -- disposed. + await =<< dispose disposable + + do atomically $ putTMVar threadIdVar Nothing + + pure $ Task disposable (toAwaitable resultVar) + where + disposeTask :: TMVar (Maybe ThreadId) -> AsyncVar r -> IO (Awaitable ()) + disposeTask threadIdVar resultVar = mask_ do + -- Blocks until the thread is forked + atomically (swapTMVar threadIdVar Nothing) >>= \case + -- Thread completed or initialization failed + Nothing -> pure () + Just threadId -> throwTo threadId CancelTask + + -- Wait for task completion or failure. Tasks must not ignore `CancelTask` or this will hang. + pure $ void (toAwaitable resultVar) `catchAll` const (pure ()) + +unmanagedForkWithUnmask_ :: MonadIO m => ((forall b. IO b -> IO b) -> IO ()) -> m Disposable +unmanagedForkWithUnmask_ action = toDisposable <$> unmanagedForkWithUnmask action