Add ShortIO newtype to track IO invariant for TIOWorker

parent b32fe965
......@@ -110,6 +110,7 @@ library
......@@ -9,19 +9,20 @@ module Quasar.Async.STMHelper (
) where
import Control.Concurrent (ThreadId, forkIO, forkIOWithUnmask)
import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent.STM
import Control.Exception (BlockedIndefinitelyOnSTM)
import Control.Monad.Catch
import Quasar.Awaitable
import Quasar.Exceptions
import Quasar.Prelude
import Quasar.Utils.ShortIO
newtype TIOWorker = TIOWorker (TQueue (IO ()))
startShortIO :: forall a. IO a -> TIOWorker -> ExceptionChannel -> STM (Awaitable a)
startShortIO :: forall a. ShortIO a -> TIOWorker -> ExceptionChannel -> STM (Awaitable a)
startShortIO fn (TIOWorker jobQueue) exChan = do
resultVar <- newAsyncVarSTM
writeTQueue jobQueue $ job resultVar
......@@ -29,13 +30,13 @@ startShortIO fn (TIOWorker jobQueue) exChan = do
job :: AsyncVar a -> IO ()
job resultVar = do
try fn >>= \case
try (runShortIO fn) >>= \case
Left ex -> do
atomically $ throwToExceptionChannel exChan ex
failAsyncVar_ resultVar $ toException $ AsyncException ex
Right result -> putAsyncVar_ resultVar result
startShortIO_ :: forall a. IO a -> TIOWorker -> ExceptionChannel -> STM ()
startShortIO_ :: forall a. ShortIO a -> TIOWorker -> ExceptionChannel -> STM ()
startShortIO_ x y z = void $ startShortIO x y z
......@@ -61,8 +62,8 @@ fork_ fn worker exChan = void $ fork fn worker exChan
forkWithUnmask :: ((forall a. IO a -> IO a) -> IO ()) -> TIOWorker -> ExceptionChannel -> STM (Awaitable ThreadId)
forkWithUnmask fn worker exChan = startShortIO forkFn worker exChan
forkFn :: IO ThreadId
forkFn = mask_ $ forkIOWithUnmask wrappedFn
forkFn :: ShortIO ThreadId
forkFn = mask_ $ forkIOWithUnmaskShortIO wrappedFn
wrappedFn :: (forall a. IO a -> IO a) -> IO ()
wrappedFn unmask = fn unmask `catchAll` \ex -> atomically (throwToExceptionChannel exChan ex)
......@@ -17,7 +17,6 @@ module Quasar.Async.V2 (
import Control.Concurrent (ThreadId)
import Control.Concurrent.STM
import Control.Exception (throwTo)
import Control.Monad.Catch
import Quasar.Async.STMHelper
import Quasar.Awaitable
......@@ -25,6 +24,7 @@ import Quasar.Exceptions
import Quasar.Monad
import Quasar.Prelude
import Quasar.Resources.Disposer
import Quasar.Utils.ShortIO
import Control.Monad.Reader
......@@ -64,12 +64,12 @@ unmanagedAsyncWithUnmask fn worker exChan = do
Right retVal -> do
putAsyncVar_ resultVar retVal
atomically $ disposeEventuallySTM_ disposer
disposeFn :: Unique -> AsyncVar a -> Awaitable ThreadId -> IO (Awaitable ())
disposeFn :: Unique -> AsyncVar a -> Awaitable ThreadId -> ShortIO (Awaitable ())
disposeFn key resultVar tidAwaitable = do
-- Awaits forking of the thread, which should happen immediately (as long as the TIOWorker-invariant isn't broken elsewhere)
tid <- await tidAwaitable
tid <- unsafeShortIO $ await tidAwaitable
-- `throwTo` should also happen immediately, as long as `uninterruptibleMask` isn't abused elsewhere
throwTo tid (CancelAsync key)
throwToShortIO tid (CancelAsync key)
-- Considered complete once a result (i.e. success or failure) has been stored
pure (() <$ toAwaitable resultVar)
......@@ -17,7 +17,6 @@ module Quasar.Resources.Disposer (
) where
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Monad (foldM)
import Control.Monad.Catch
......@@ -30,6 +29,7 @@ import Quasar.Async.STMHelper
import Quasar.Awaitable
import Quasar.Exceptions
import Quasar.Prelude
import Quasar.Utils.ShortIO
import Quasar.Utils.TOnce
......@@ -46,11 +46,10 @@ data Disposer
instance Resource Disposer where
getDisposer = id
type DisposeFn = IO (Awaitable ())
type DisposeFn = ShortIO (Awaitable ())
-- TODO document: IO has to be "short"
newPrimitiveDisposer :: IO (Awaitable ()) -> TIOWorker -> ExceptionChannel -> STM Disposer
newPrimitiveDisposer :: ShortIO (Awaitable ()) -> TIOWorker -> ExceptionChannel -> STM Disposer
newPrimitiveDisposer fn worker exChan = do
key <- newUniqueSTM
FnDisposer key worker exChan <$> newTOnce fn <*> newFinalizers
......@@ -95,18 +94,18 @@ beginDisposeFnDisposer worker exChan disposeState finalizers =
startShortIO_ (runDisposeFn awaitableVar disposeFn) worker exChan
pure $ join (toAwaitable awaitableVar)
runDisposeFn :: AsyncVar (Awaitable ()) -> DisposeFn -> IO ()
runDisposeFn :: AsyncVar (Awaitable ()) -> DisposeFn -> ShortIO ()
runDisposeFn awaitableVar disposeFn = mask_ $ handleAll exceptionHandler do
awaitable <- disposeFn
putAsyncVar_ awaitableVar awaitable
putAsyncVarShortIO_ awaitableVar awaitable
runFinalizersAfter finalizers awaitable
exceptionHandler :: SomeException -> IO ()
-- In case of an exception mark disposable as completed to prevent resource managers from being stuck indefinitely
exceptionHandler :: SomeException -> ShortIO ()
exceptionHandler ex = do
-- In case of an exception mark disposable as completed to prevent resource managers from being stuck indefinitely
putAsyncVar_ awaitableVar (pure ())
atomically $ runFinalizers finalizers
throwIO $ DisposeException ex
putAsyncVarShortIO_ awaitableVar (pure ())
runFinalizersShortIO finalizers
throwM $ DisposeException ex
disposerKey :: Disposer -> Unique
disposerKey (FnDisposer key _ _ _ _) = key
......@@ -193,7 +192,7 @@ beginDisposeResourceManagerInternal rm = do
dependenciesVar <- newAsyncVarSTM
writeTVar (resourceManagerState rm) (ResourceManagerDisposing (toAwaitable dependenciesVar))
attachedDisposers <- HM.elems <$> readTVar attachedResources
startShortIO_ (void $ forkIO (disposeThread dependenciesVar attachedDisposers)) worker exChan
startShortIO_ (void $ forkIOShortIO (disposeThread dependenciesVar attachedDisposers)) worker exChan
pure $ DisposeDependencies rmKey (toAwaitable dependenciesVar)
ResourceManagerDisposing deps -> pure $ DisposeDependencies rmKey deps
ResourceManagerDisposed -> pure $ DisposeDependencies rmKey mempty
......@@ -274,14 +273,17 @@ runFinalizers (Finalizers finalizerVar) = do
Just finalizers -> sequence_ finalizers
Nothing -> throwM $ userError "runFinalizers was called multiple times (it must only be run once)"
runFinalizersAfter :: Finalizers -> Awaitable () -> IO ()
runFinalizersShortIO :: Finalizers -> ShortIO ()
runFinalizersShortIO finalizers = unsafeShortIO $ atomically $ runFinalizers finalizers
runFinalizersAfter :: Finalizers -> Awaitable () -> ShortIO ()
runFinalizersAfter finalizers awaitable = do
-- Peek awaitable to ensure trivial disposables always run without forking
isCompleted <- isJust <$> peekAwaitable awaitable
isCompleted <- isJust <$> peekAwaitableShortIO awaitable
if isCompleted
atomically $ runFinalizers finalizers
runFinalizersShortIO finalizers
void $ forkIO do
void $ forkIOShortIO do
await awaitable
atomically $ runFinalizers finalizers
module Quasar.Utils.ShortIO (
-- ** Some specific functions required internally
) where
import Control.Monad.Catch
import Quasar.Awaitable
import Quasar.Prelude
import Control.Concurrent
newtype ShortIO a = ShortIO (IO a)
deriving newtype (Functor, Applicative, Monad, MonadThrow, MonadCatch, MonadMask)
runShortIO :: ShortIO a -> IO a
runShortIO (ShortIO fn) = fn
unsafeShortIO :: IO a -> ShortIO a
unsafeShortIO = ShortIO
forkIOShortIO :: IO () -> ShortIO ThreadId
forkIOShortIO fn = ShortIO $ forkIO fn
forkIOWithUnmaskShortIO :: ((forall a. IO a -> IO a) -> IO ()) -> ShortIO ThreadId
forkIOWithUnmaskShortIO fn = ShortIO $ forkIOWithUnmask fn
throwToShortIO :: Exception e => ThreadId -> e -> ShortIO ()
throwToShortIO tid ex = ShortIO $ throwTo tid ex
peekAwaitableShortIO :: Awaitable r -> ShortIO (Maybe r)
peekAwaitableShortIO awaitable = ShortIO $ peekAwaitable awaitable
putAsyncVarShortIO_ :: AsyncVar a -> a -> ShortIO ()
putAsyncVarShortIO_ var value = ShortIO $ putAsyncVar_ var value
