diff --git a/quasar.cabal b/quasar.cabal index 310a6dbc041dae7d6edd38759f89c83ae58d8866..97a4dd453acfe2ce878efe737649f9322f88a58b 100644 --- a/quasar.cabal +++ b/quasar.cabal @@ -88,6 +88,7 @@ library Quasar.Async Quasar.Async.Unmanaged Quasar.Async.STMHelper + Quasar.Async.V2 Quasar.Awaitable Quasar.Disposable Quasar.Exceptions diff --git a/src/Quasar/Async.hs b/src/Quasar/Async.hs index 1827ee0cb805e6967a4019b2185c8a8681a8eb67..a1c4640718027bacdf8ecb435e5fe29adc137978 100644 --- a/src/Quasar/Async.hs +++ b/src/Quasar/Async.hs @@ -24,7 +24,6 @@ module Quasar.Async ( import Control.Monad.Catch import Control.Monad.Reader import Quasar.Async.Unmanaged -import Quasar.Awaitable import Quasar.Disposable import Quasar.Prelude import Quasar.ResourceManager diff --git a/src/Quasar/Async/STMHelper.hs b/src/Quasar/Async/STMHelper.hs index d541072d2b86a8dd3fa77d22dcbd362c1189861a..610168041121d010f213598977c5a934cb248689 100644 --- a/src/Quasar/Async/STMHelper.hs +++ b/src/Quasar/Async/STMHelper.hs @@ -1,11 +1,15 @@ module Quasar.Async.STMHelper ( TIOWorker, newTIOWorker, - startTrivialIO, - startTrivialIO_, + startShortIO, + startShortIO_, + fork, + fork_, + forkWithUnmask, + forkWithUnmask_, ) where -import Control.Concurrent (forkIO) +import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask) import Control.Concurrent.STM import Control.Exception (BlockedIndefinitelyOnSTM) import Control.Monad.Catch @@ -17,8 +21,8 @@ import Quasar.Prelude newtype TIOWorker = TIOWorker (TQueue (IO ())) -startTrivialIO :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM (Awaitable a) -startTrivialIO (TIOWorker jobQueue) exChan fn = do +startShortIO :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM (Awaitable a) +startShortIO (TIOWorker jobQueue) exChan fn = do resultVar <- newAsyncVarSTM writeTQueue jobQueue $ job resultVar pure $ toAwaitable resultVar @@ -31,8 +35,8 @@ startTrivialIO (TIOWorker jobQueue) exChan fn = do failAsyncVar_ resultVar $ toException $ AsyncException ex Right result -> putAsyncVar_ resultVar result -startTrivialIO_ :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM () -startTrivialIO_ x y z = void $ startTrivialIO x y z +startShortIO_ :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM () +startShortIO_ x y z = void $ startShortIO x y z newTIOWorker :: IO TIOWorker @@ -43,5 +47,24 @@ newTIOWorker = do (forever $ join $ atomically $ readTQueue jobQueue) -- Relies on garbage collection to remove the thread when it is no longer needed (\(_ :: BlockedIndefinitelyOnSTM) -> pure ()) - + pure $ TIOWorker jobQueue + + +fork :: TIOWorker -> ExceptionChannel -> IO () -> STM (Awaitable ThreadId) +fork worker exChan fn = forkWithUnmask worker exChan (\unmask -> unmask fn) + +fork_ :: TIOWorker -> ExceptionChannel -> IO () -> STM () +fork_ worker exChan fn = void $ fork worker exChan fn + + +forkWithUnmask :: TIOWorker -> ExceptionChannel -> ((forall a. IO a -> IO a) -> IO ()) -> STM (Awaitable ThreadId) +forkWithUnmask worker exChan fn = startShortIO worker exChan launcher + where + launcher :: IO ThreadId + launcher = mask_ $ forkIOWithUnmask wrappedFn + wrappedFn :: (forall a. IO a -> IO a) -> IO () + wrappedFn unmask = fn unmask `catchAll` \ex -> atomically (throwToExceptionChannel exChan ex) + +forkWithUnmask_ :: TIOWorker -> ExceptionChannel -> ((forall a. IO a -> IO a) -> IO ()) -> STM () +forkWithUnmask_ worker exChan fn = void $ forkWithUnmask worker exChan fn diff --git a/src/Quasar/Async/V2.hs b/src/Quasar/Async/V2.hs new file mode 100644 index 0000000000000000000000000000000000000000..28bc7978b4bd9770d831c89e9dfbebdf3f581dcc --- /dev/null +++ b/src/Quasar/Async/V2.hs @@ -0,0 +1,80 @@ +module Quasar.Async.V2 ( + Async, + async, + asyncWithUnmask, + + -- ** Unmanaged variants + unmanagedAsync, + unmanagedAsyncWithUnmask, +) where + +import Control.Concurrent (ThreadId) +import Control.Concurrent.STM +import Control.Exception (throwTo) +import Control.Monad.Catch +import Quasar.Async.STMHelper +import Quasar.Awaitable +import Quasar.Exceptions +import Quasar.Monad +import Quasar.Prelude +import Quasar.Resources.Disposer + + +data Async a = Async (Awaitable a) Disposer + +instance Resource (Async a) where + getDisposer (Async _ disposer) = disposer + +instance IsAwaitable a (Async a) where + toAwaitable (Async awaitable _) = awaitable + + +unmanagedAsync :: TIOWorker -> ExceptionChannel -> IO a -> STM (Async a) +unmanagedAsync worker exChan fn = unmanagedAsyncWithUnmask worker exChan \unmask -> unmask fn + +unmanagedAsyncWithUnmask :: forall a. TIOWorker -> ExceptionChannel -> ((forall b. IO b -> IO b) -> IO a) -> STM (Async a) +unmanagedAsyncWithUnmask worker exChan fn = do + key <- newUniqueSTM + resultVar <- newAsyncVarSTM + disposer <- mfix \disposer -> do + tidAwaitable <- forkWithUnmask worker exChan (runAndPut key resultVar disposer) + newPrimitiveDisposer worker exChan (disposeFn key resultVar tidAwaitable) + pure $ Async (toAwaitable resultVar) disposer + where + runAndPut :: Unique -> AsyncVar a -> Disposer -> (forall b. IO b -> IO b) -> IO () + runAndPut key resultVar disposer unmask = do + -- Called in masked state by `forkWithUnmask` + result <- try $ fn unmask + case result of + Left (fromException -> Just (CancelAsync ((== key) -> True))) -> + failAsyncVar_ resultVar AsyncDisposed + Left ex -> do + atomically (throwToExceptionChannel exChan ex) + `finally` do + failAsyncVar_ resultVar (AsyncException ex) + atomically $ disposeEventuallySTM_ disposer + Right retVal -> do + putAsyncVar_ resultVar retVal + atomically $ disposeEventuallySTM_ disposer + disposeFn :: Unique -> AsyncVar a -> Awaitable ThreadId -> IO (Awaitable ()) + disposeFn key resultVar tidAwaitable = do + -- Awaits forking of the thread, which should happen immediately (as long as the TIOWorker-invariant isn't broken elsewhere) + tid <- await tidAwaitable + -- `throwTo` should also happen immediately, as long as `uninterruptibleMask` isn't abused elsewhere + throwTo tid (CancelAsync key) + -- Considered complete once a result (i.e. success or failure) has been stored + pure (() <$ toAwaitable resultVar) + + +async :: MonadQuasar m => IO a -> m (Async a) +async fn = asyncWithUnmask ($ fn) + +asyncWithUnmask :: MonadQuasar m => ((forall b. IO b -> IO b) -> IO a) -> m (Async a) +asyncWithUnmask fn = do + worker <- askIOWorker + exChan <- askExceptionChannel + rm <- askResourceManager + runSTM do + as <- unmanagedAsyncWithUnmask worker exChan fn + attachResource rm as + pure as diff --git a/src/Quasar/Resources/Disposer.hs b/src/Quasar/Resources/Disposer.hs index 2083aaccef9237380d1d75ff7ee27abb24914012..3bf9d39912e6e68ef4ef47c4a0deb139474a10d0 100644 --- a/src/Quasar/Resources/Disposer.hs +++ b/src/Quasar/Resources/Disposer.hs @@ -83,12 +83,12 @@ beginDisposeFnDisposer worker exChan disposeState finalizers = startDisposeFn :: DisposeFn -> STM (Awaitable ()) startDisposeFn disposeFn = do awaitableVar <- newAsyncVarSTM - startTrivialIO_ worker exChan (runDisposeFn awaitableVar disposeFn) + startShortIO_ worker exChan (runDisposeFn awaitableVar disposeFn) pure $ join (toAwaitable awaitableVar) runDisposeFn :: AsyncVar (Awaitable ()) -> DisposeFn -> IO () runDisposeFn awaitableVar disposeFn = mask_ $ handleAll exceptionHandler do - awaitable <- disposeFn + awaitable <- disposeFn putAsyncVar_ awaitableVar awaitable runFinalizersAfter finalizers awaitable where @@ -185,7 +185,7 @@ beginDisposeResourceManagerInternal rm = do dependenciesVar <- newAsyncVarSTM writeTVar (resourceManagerState rm) (ResourceManagerDisposing (toAwaitable dependenciesVar)) attachedDisposers <- HM.elems <$> readTVar attachedResources - startTrivialIO_ worker undefined (void $ forkIO (disposeThread dependenciesVar attachedDisposers)) + startShortIO_ worker undefined (void $ forkIO (disposeThread dependenciesVar attachedDisposers)) pure $ DisposeDependencies rmKey (toAwaitable dependenciesVar) ResourceManagerDisposing deps -> pure $ DisposeDependencies rmKey deps ResourceManagerDisposed -> pure $ DisposeDependencies rmKey mempty