diff --git a/src/Quasar/Async/STMHelper.hs b/src/Quasar/Async/STMHelper.hs index 610168041121d010f213598977c5a934cb248689..623379ef07bb8bd785aa8df5ea38aa70a15ba275 100644 --- a/src/Quasar/Async/STMHelper.hs +++ b/src/Quasar/Async/STMHelper.hs @@ -21,8 +21,8 @@ import Quasar.Prelude newtype TIOWorker = TIOWorker (TQueue (IO ())) -startShortIO :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM (Awaitable a) -startShortIO (TIOWorker jobQueue) exChan fn = do +startShortIO :: forall a. IO a -> TIOWorker -> ExceptionChannel -> STM (Awaitable a) +startShortIO fn (TIOWorker jobQueue) exChan = do resultVar <- newAsyncVarSTM writeTQueue jobQueue $ job resultVar pure $ toAwaitable resultVar @@ -35,7 +35,7 @@ startShortIO (TIOWorker jobQueue) exChan fn = do failAsyncVar_ resultVar $ toException $ AsyncException ex Right result -> putAsyncVar_ resultVar result -startShortIO_ :: forall a. TIOWorker -> ExceptionChannel -> IO a -> STM () +startShortIO_ :: forall a. IO a -> TIOWorker -> ExceptionChannel -> STM () startShortIO_ x y z = void $ startShortIO x y z @@ -51,20 +51,20 @@ newTIOWorker = do pure $ TIOWorker jobQueue -fork :: TIOWorker -> ExceptionChannel -> IO () -> STM (Awaitable ThreadId) -fork worker exChan fn = forkWithUnmask worker exChan (\unmask -> unmask fn) +fork :: IO () -> TIOWorker -> ExceptionChannel -> STM (Awaitable ThreadId) +fork fn = forkWithUnmask (\unmask -> unmask fn) -fork_ :: TIOWorker -> ExceptionChannel -> IO () -> STM () -fork_ worker exChan fn = void $ fork worker exChan fn +fork_ :: IO () -> TIOWorker -> ExceptionChannel -> STM () +fork_ fn worker exChan = void $ fork fn worker exChan -forkWithUnmask :: TIOWorker -> ExceptionChannel -> ((forall a. IO a -> IO a) -> IO ()) -> STM (Awaitable ThreadId) -forkWithUnmask worker exChan fn = startShortIO worker exChan launcher +forkWithUnmask :: ((forall a. IO a -> IO a) -> IO ()) -> TIOWorker -> ExceptionChannel -> STM (Awaitable ThreadId) +forkWithUnmask fn worker exChan = startShortIO forkFn worker exChan where - launcher :: IO ThreadId - launcher = mask_ $ forkIOWithUnmask wrappedFn + forkFn :: IO ThreadId + forkFn = mask_ $ forkIOWithUnmask wrappedFn wrappedFn :: (forall a. IO a -> IO a) -> IO () wrappedFn unmask = fn unmask `catchAll` \ex -> atomically (throwToExceptionChannel exChan ex) -forkWithUnmask_ :: TIOWorker -> ExceptionChannel -> ((forall a. IO a -> IO a) -> IO ()) -> STM () -forkWithUnmask_ worker exChan fn = void $ forkWithUnmask worker exChan fn +forkWithUnmask_ :: ((forall a. IO a -> IO a) -> IO ()) -> TIOWorker -> ExceptionChannel -> STM () +forkWithUnmask_ fn worker exChan = void $ forkWithUnmask fn worker exChan diff --git a/src/Quasar/Async/V2.hs b/src/Quasar/Async/V2.hs index 4952689835b8c80f877994feeda307c77551c6da..1f8a90600d943ee517092745a0f0a7ad385590b1 100644 --- a/src/Quasar/Async/V2.hs +++ b/src/Quasar/Async/V2.hs @@ -37,16 +37,16 @@ instance IsAwaitable a (Async a) where toAwaitable (Async awaitable _) = awaitable -unmanagedAsync :: TIOWorker -> ExceptionChannel -> IO a -> STM (Async a) -unmanagedAsync worker exChan fn = unmanagedAsyncWithUnmask worker exChan \unmask -> unmask fn +unmanagedAsync :: IO a -> TIOWorker -> ExceptionChannel -> STM (Async a) +unmanagedAsync fn = unmanagedAsyncWithUnmask (\unmask -> unmask fn) -unmanagedAsyncWithUnmask :: forall a. TIOWorker -> ExceptionChannel -> ((forall b. IO b -> IO b) -> IO a) -> STM (Async a) -unmanagedAsyncWithUnmask worker exChan fn = do +unmanagedAsyncWithUnmask :: forall a. ((forall b. IO b -> IO b) -> IO a) -> TIOWorker -> ExceptionChannel -> STM (Async a) +unmanagedAsyncWithUnmask fn worker exChan = do key <- newUniqueSTM resultVar <- newAsyncVarSTM disposer <- mfix \disposer -> do - tidAwaitable <- forkWithUnmask worker exChan (runAndPut key resultVar disposer) - newPrimitiveDisposer worker exChan (disposeFn key resultVar tidAwaitable) + tidAwaitable <- forkWithUnmask (runAndPut key resultVar disposer) worker exChan + newPrimitiveDisposer (disposeFn key resultVar tidAwaitable) worker exChan pure $ Async (toAwaitable resultVar) disposer where runAndPut :: Unique -> AsyncVar a -> Disposer -> (forall b. IO b -> IO b) -> IO () @@ -84,7 +84,7 @@ asyncWithUnmask fn = do exChan <- askExceptionChannel rm <- askResourceManager runSTM do - as <- unmanagedAsyncWithUnmask worker exChan \unmask -> runReaderT (fn (liftUnmask unmask)) quasar + as <- unmanagedAsyncWithUnmask (\unmask -> runReaderT (fn (liftUnmask unmask)) quasar) worker exChan attachResource rm as pure as where diff --git a/src/Quasar/Resources.hs b/src/Quasar/Resources.hs index 3bf9dca665227f45d24849b17c89c6217732be3c..4520beec0958c86037cd03c631b26847a4bc8304 100644 --- a/src/Quasar/Resources.hs +++ b/src/Quasar/Resources.hs @@ -40,10 +40,10 @@ import Quasar.Prelude import Quasar.Resources.Disposer -newIODisposer :: TIOWorker -> ExceptionChannel -> IO () -> STM Disposer +newIODisposer :: IO () -> TIOWorker -> ExceptionChannel -> STM Disposer newIODisposer = undefined -newSTMDisposer :: TIOWorker -> ExceptionChannel -> STM () -> STM Disposer +newSTMDisposer :: STM () -> TIOWorker -> ExceptionChannel -> STM Disposer newSTMDisposer = undefined @@ -57,14 +57,14 @@ registerDisposeAction fn = do worker <- askIOWorker exChan <- askExceptionChannel rm <- askResourceManager - runSTM $ attachResource rm =<< newIODisposer worker exChan fn + runSTM $ attachResource rm =<< newIODisposer fn worker exChan registerDisposeTransaction :: MonadQuasar m => STM () -> m () registerDisposeTransaction fn = do worker <- askIOWorker exChan <- askExceptionChannel rm <- askResourceManager - runSTM $ attachResource rm =<< newSTMDisposer worker exChan fn + runSTM $ attachResource rm =<< newSTMDisposer fn worker exChan registerNewResource :: forall a m. (Resource a, MonadQuasar m) => m a -> m a registerNewResource fn = do diff --git a/src/Quasar/Resources/Disposer.hs b/src/Quasar/Resources/Disposer.hs index 75270d2210aba5a614a9c4ab01364e2be214f84d..50366876204ae1d9be016aad6f5ad429a999ede1 100644 --- a/src/Quasar/Resources/Disposer.hs +++ b/src/Quasar/Resources/Disposer.hs @@ -50,8 +50,8 @@ type DisposeFn = IO (Awaitable ()) -- TODO document: IO has to be "short" -newPrimitiveDisposer :: TIOWorker -> ExceptionChannel -> IO (Awaitable ()) -> STM Disposer -newPrimitiveDisposer worker exChan fn = do +newPrimitiveDisposer :: IO (Awaitable ()) -> TIOWorker -> ExceptionChannel -> STM Disposer +newPrimitiveDisposer fn worker exChan = do key <- newUniqueSTM FnDisposer key worker exChan <$> newTOnce fn <*> newFinalizers @@ -92,7 +92,7 @@ beginDisposeFnDisposer worker exChan disposeState finalizers = startDisposeFn :: DisposeFn -> STM (Awaitable ()) startDisposeFn disposeFn = do awaitableVar <- newAsyncVarSTM - startShortIO_ worker exChan (runDisposeFn awaitableVar disposeFn) + startShortIO_ (runDisposeFn awaitableVar disposeFn) worker exChan pure $ join (toAwaitable awaitableVar) runDisposeFn :: AsyncVar (Awaitable ()) -> DisposeFn -> IO () @@ -193,7 +193,7 @@ beginDisposeResourceManagerInternal rm = do dependenciesVar <- newAsyncVarSTM writeTVar (resourceManagerState rm) (ResourceManagerDisposing (toAwaitable dependenciesVar)) attachedDisposers <- HM.elems <$> readTVar attachedResources - startShortIO_ worker undefined (void $ forkIO (disposeThread dependenciesVar attachedDisposers)) + startShortIO_ (void $ forkIO (disposeThread dependenciesVar attachedDisposers)) worker undefined pure $ DisposeDependencies rmKey (toAwaitable dependenciesVar) ResourceManagerDisposing deps -> pure $ DisposeDependencies rmKey deps ResourceManagerDisposed -> pure $ DisposeDependencies rmKey mempty diff --git a/src/Quasar/Timer.hs b/src/Quasar/Timer.hs index 0c344e245b46054e98e45f8bf574c2d6fe3851ba..133b03a0f8d8c1a036ba505b73c44f75f958e850 100644 --- a/src/Quasar/Timer.hs +++ b/src/Quasar/Timer.hs @@ -180,17 +180,20 @@ newUnmanagedTimer scheduler time = liftIO do key <- newUnique completed <- newAsyncVar atomically do - disposer <- newSTMDisposer (ioWorker scheduler) (exceptionChannel scheduler) do - cancelled <- failAsyncVarSTM completed TimerCancelled - when cancelled do - modifyTVar (activeCount scheduler) (+ (-1)) - modifyTVar (cancelledCount scheduler) (+ 1) + disposer <- newSTMDisposer (disposeFn completed) (ioWorker scheduler) (exceptionChannel scheduler) let timer = Timer { key, time, completed, disposer, scheduler } tryTakeTMVar (heap scheduler) >>= \case Just timers -> putTMVar (heap scheduler) (insert timer timers) Nothing -> throwM TimerSchedulerDisposed modifyTVar (activeCount scheduler) (+ 1) pure timer + where + disposeFn :: AsyncVar () -> STM () + disposeFn completed = do + cancelled <- failAsyncVarSTM completed TimerCancelled + when cancelled do + modifyTVar (activeCount scheduler) (+ (-1)) + modifyTVar (cancelledCount scheduler) (+ 1) sleepUntil :: MonadIO m => TimerScheduler -> UTCTime -> m ()