Skip to content
Snippets Groups Projects
Fork.hs 3.21 KiB
Newer Older
module Quasar.Async.Fork (
  -- * Forking with an asynchronous exception channel

  -- ** IO
  forkWithUnmask,
  forkWithUnmask_,
  forkFuture,
  forkFutureWithUnmask,
Jens Nolte's avatar
Jens Nolte committed
  forkSTM,
  forkSTM_,
  forkWithUnmaskSTM,
  forkWithUnmaskSTM_,
  forkAsyncSTM,
  forkAsyncWithUnmaskSTM,
import Control.Concurrent (ThreadId, forkIOWithUnmask)
import Control.Monad.Catch
import Quasar.Async.STMHelper
import Quasar.Future
import Quasar.Exceptions
import Quasar.Prelude
import Quasar.Utils.ShortIO


Jens Nolte's avatar
Jens Nolte committed
-- * Fork in STM (with ExceptionSink)
Jens Nolte's avatar
Jens Nolte committed
forkSTM :: IO () -> TIOWorker -> ExceptionSink -> STM (Future ThreadId)
Jens Nolte's avatar
Jens Nolte committed
forkSTM fn = forkWithUnmaskSTM (\unmask -> unmask fn)
Jens Nolte's avatar
Jens Nolte committed
forkSTM_ :: IO () -> TIOWorker -> ExceptionSink -> STM ()
Jens Nolte's avatar
Jens Nolte committed
forkSTM_ fn worker exChan = void $ forkSTM fn worker exChan
Jens Nolte's avatar
Jens Nolte committed
forkWithUnmaskSTM :: ((forall a. IO a -> IO a) -> IO ()) -> TIOWorker -> ExceptionSink -> STM (Future ThreadId)
-- TODO change TIOWorker behavior for spawning threads, so no `unsafeShortIO` is necessary
forkWithUnmaskSTM fn worker exChan = startShortIOSTM (unsafeShortIO $ forkWithUnmask fn exChan) worker exChan
Jens Nolte's avatar
Jens Nolte committed
forkWithUnmaskSTM_ :: ((forall a. IO a -> IO a) -> IO ()) -> TIOWorker -> ExceptionSink -> STM ()
Jens Nolte's avatar
Jens Nolte committed
forkWithUnmaskSTM_ fn worker exChan = void $ forkWithUnmaskSTM fn worker exChan


Jens Nolte's avatar
Jens Nolte committed
forkAsyncSTM :: forall a. IO a -> TIOWorker -> ExceptionSink -> STM (Future a)
-- TODO change TIOWorker behavior for spawning threads, so no `unsafeShortIO` is necessary
forkAsyncSTM fn worker exChan = join <$> startShortIOSTM (unsafeShortIO $ forkFuture fn exChan) worker exChan
Jens Nolte's avatar
Jens Nolte committed

Jens Nolte's avatar
Jens Nolte committed
forkAsyncWithUnmaskSTM :: forall a. ((forall b. IO b -> IO b) -> IO a) -> TIOWorker -> ExceptionSink -> STM (Future a)
-- TODO change TIOWorker behavior for spawning threads, so no `unsafeShortIO` is necessary
forkAsyncWithUnmaskSTM fn worker exChan = join <$> startShortIOSTM (unsafeShortIO $ forkFutureWithUnmask fn exChan) worker exChan
-- * Fork in IO, redirecting errors to an ExceptionSink
forkWithUnmask :: ((forall a. IO a -> IO a) -> IO ()) -> ExceptionSink -> IO ThreadId
forkWithUnmask fn exChan = mask_ $ forkIOWithUnmask wrappedFn
  where
    wrappedFn :: (forall a. IO a -> IO a) -> IO ()
Jens Nolte's avatar
Jens Nolte committed
    wrappedFn unmask = fn unmask `catchAll` \ex -> atomically (throwToExceptionSink exChan ex)
forkWithUnmask_ :: ((forall a. IO a -> IO a) -> IO ()) -> ExceptionSink -> IO ()
forkWithUnmask_ fn exChan = void $ forkWithUnmask fn exChan
-- * Fork in IO while collecting the result, redirecting errors to an ExceptionSink
forkFuture :: forall a. IO a -> ExceptionSink -> IO (Future a)
forkFuture fn = forkFutureWithUnmask ($ fn)
Jens Nolte's avatar
Jens Nolte committed

forkFutureWithUnmask :: forall a. ((forall b. IO b -> IO b) -> IO a) -> ExceptionSink -> IO (Future a)
forkFutureWithUnmask fn exChan = do
  resultVar <- newPromise
  forkWithUnmask_ (runAndPut resultVar) exChan
Jens Nolte's avatar
Jens Nolte committed
  pure $ toFuture resultVar
Jens Nolte's avatar
Jens Nolte committed
    runAndPut :: Promise a -> (forall b. IO b -> IO b) -> IO ()
    runAndPut resultVar unmask = do
      -- Called in masked state by `forkWithUnmaskShortIO`
      result <- try $ fn unmask
      case result of
        Left ex ->
Jens Nolte's avatar
Jens Nolte committed
          atomically (throwToExceptionSink exChan ex)
Jens Nolte's avatar
Jens Nolte committed
              breakPromise resultVar (AsyncException ex)
Jens Nolte's avatar
Jens Nolte committed
          fulfillPromise resultVar retVal