diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index 99d089212541215cd2b34cc203bcdb80133eb11b..64f684e0554056edea5278e6c8f6545e21bd5c20 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -1,14 +1,14 @@ module Quasar.Core ( - -- * ResourceManager - ResourceManager, - ResourceManagerConfiguraiton(..), - HasResourceManager(..), - withResourceManager, - withDefaultResourceManager, - withUnlimitedResourceManager, - newResourceManager, - defaultResourceManagerConfiguration, - unlimitedResourceManagerConfiguration, + -- * AsyncManager + AsyncManager, + AsyncManagerConfiguraiton(..), + MonadAsync(..), + withAsyncManager, + withDefaultAsyncManager, + withUnlimitedAsyncManager, + newAsyncManager, + defaultAsyncManagerConfiguration, + unlimitedAsyncManagerConfiguration, -- * Task Task, @@ -32,6 +32,10 @@ module Quasar.Core ( newDisposable, synchronousDisposable, noDisposable, + + -- ** ResourceManager + ResourceManager, + newResourceManager, disposeEventually, attachDisposable, attachDisposeAction, @@ -48,58 +52,62 @@ import Quasar.Prelude --- | A monad for actions that run on a thread bound to a `ResourceManager`. -newtype AsyncIO a = AsyncIO (ReaderT ResourceManager IO a) +-- | A monad for actions that run on a thread bound to a `AsyncManager`. +newtype AsyncIO a = AsyncIO (ReaderT AsyncManager IO a) deriving newtype (Functor, Applicative, Monad, MonadIO, MonadThrow, MonadCatch, MonadMask) -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -async :: HasResourceManager m => AsyncIO r -> m (Task r) +async :: MonadAsync m => AsyncIO r -> m (Task r) async action = asyncWithUnmask (\unmask -> unmask action) -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. -asyncWithUnmask :: HasResourceManager m => ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> m (Task r) +asyncWithUnmask :: MonadAsync m => ((forall a. AsyncIO a -> AsyncIO a) -> AsyncIO r) -> m (Task r) -- TODO resource limits asyncWithUnmask action = do - resourceManager <- askResourceManager + asyncManager <- askAsyncManager resultVar <- newAsyncVar liftIO $ mask_ $ do void $ forkIOWithUnmask $ \unmask -> do - result <- try $ runOnResourceManager resourceManager (action (liftUnmask unmask)) + result <- try $ runOnAsyncManager asyncManager (action (liftUnmask unmask)) putAsyncVarEither_ resultVar result pure $ Task (toAwaitable resultVar) liftUnmask :: (IO a -> IO a) -> AsyncIO a -> AsyncIO a liftUnmask unmask action = do - resourceManager <- askResourceManager - liftIO $ unmask $ runOnResourceManager resourceManager action + asyncManager <- askAsyncManager + liftIO $ unmask $ runOnAsyncManager asyncManager action await :: IsAwaitable r a => a -> AsyncIO r -- TODO resource limits await = liftIO . awaitIO -class MonadIO m => HasResourceManager m where - askResourceManager :: m ResourceManager +class MonadIO m => MonadAsync m where + askAsyncManager :: m AsyncManager -instance HasResourceManager AsyncIO where - askResourceManager = AsyncIO ask +instance MonadAsync AsyncIO where + askAsyncManager = AsyncIO ask -instance MonadIO m => HasResourceManager (ReaderT ResourceManager m) where - askResourceManager = ask +instance MonadIO m => MonadAsync (ReaderT AsyncManager m) where + askAsyncManager = ask awaitResult :: IsAwaitable r a => AsyncIO a -> AsyncIO r awaitResult = (await =<<) -data ResourceManager = ResourceManager { - configuration :: ResourceManagerConfiguraiton, +data AsyncManager = AsyncManager { + resourceManager :: ResourceManager, + configuration :: AsyncManagerConfiguraiton, threads :: TVar (HashSet ThreadId) } -instance IsDisposable ResourceManager where +instance IsDisposable AsyncManager where toDisposable = undefined +instance HasResourceManager AsyncManager where + getResourceManager = resourceManager + -- | A task that is running asynchronously. It has a result and can fail. -- The result (or exception) can be aquired by using the `IsAwaitable` class (e.g. by calling `await` or `awaitIO`). @@ -152,43 +160,43 @@ data CancelledTask = CancelledTask instance Exception CancelledTask where -data ResourceManagerConfiguraiton = ResourceManagerConfiguraiton { +data AsyncManagerConfiguraiton = AsyncManagerConfiguraiton { maxThreads :: Maybe Int } -defaultResourceManagerConfiguration :: ResourceManagerConfiguraiton -defaultResourceManagerConfiguration = ResourceManagerConfiguraiton { +defaultAsyncManagerConfiguration :: AsyncManagerConfiguraiton +defaultAsyncManagerConfiguration = AsyncManagerConfiguraiton { maxThreads = Just 1 } -unlimitedResourceManagerConfiguration :: ResourceManagerConfiguraiton -unlimitedResourceManagerConfiguration = ResourceManagerConfiguraiton { +unlimitedAsyncManagerConfiguration :: AsyncManagerConfiguraiton +unlimitedAsyncManagerConfiguration = AsyncManagerConfiguraiton { maxThreads = Nothing } -withResourceManager :: ResourceManagerConfiguraiton -> AsyncIO r -> IO r -withResourceManager configuration = bracket (newResourceManager configuration) disposeResourceManager . flip runOnResourceManager +withAsyncManager :: AsyncManagerConfiguraiton -> AsyncIO r -> IO r +withAsyncManager configuration = bracket (newAsyncManager configuration) disposeAsyncManager . flip runOnAsyncManager -runOnResourceManager :: ResourceManager -> AsyncIO r -> IO r -runOnResourceManager resourceManager (AsyncIO action) = runReaderT action resourceManager +runOnAsyncManager :: AsyncManager -> AsyncIO r -> IO r +runOnAsyncManager asyncManager (AsyncIO action) = runReaderT action asyncManager -withDefaultResourceManager :: AsyncIO a -> IO a -withDefaultResourceManager = withResourceManager defaultResourceManagerConfiguration +withDefaultAsyncManager :: AsyncIO a -> IO a +withDefaultAsyncManager = withAsyncManager defaultAsyncManagerConfiguration -withUnlimitedResourceManager :: AsyncIO a -> IO a -withUnlimitedResourceManager = withResourceManager unlimitedResourceManagerConfiguration +withUnlimitedAsyncManager :: AsyncIO a -> IO a +withUnlimitedAsyncManager = withAsyncManager unlimitedAsyncManagerConfiguration -newResourceManager :: ResourceManagerConfiguraiton -> IO ResourceManager -newResourceManager configuration = do +newAsyncManager :: AsyncManagerConfiguraiton -> IO AsyncManager +newAsyncManager configuration = do threads <- newTVarIO mempty - pure ResourceManager { + pure AsyncManager { configuration, threads } -disposeResourceManager :: ResourceManager -> IO () +disposeAsyncManager :: AsyncManager -> IO () -- TODO resource management -disposeResourceManager = const (pure ()) +disposeAsyncManager = const (pure ()) @@ -292,6 +300,18 @@ synchronousDisposable = newDisposable . fmap pure . liftIO noDisposable :: Disposable noDisposable = mempty + +data ResourceManager = ResourceManager + +class HasResourceManager a where + getResourceManager :: a -> ResourceManager + +instance IsDisposable ResourceManager where + toDisposable = undefined + +newResourceManager :: IO ResourceManager +newResourceManager = pure ResourceManager + -- | Start disposing a resource but instead of waiting for the operation to complete, pass the responsibility to a `ResourceManager`. -- -- The synchronous part of the `dispose`-Function will be run immediately but the resulting `Awaitable` will be passed to the resource manager. diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index d4e8d8136e0463ba7f671ce06bbfe4d5a591c8f4..46130c4c65baa034f729f318a8823682f9906295 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -64,10 +64,10 @@ instance Applicative ObservableMessage where class IsRetrievable v a | a -> v where - retrieve :: HasResourceManager m => a -> m (Task v) + retrieve :: MonadAsync m => a -> m (Task v) retrieveIO :: IsRetrievable v a => a -> IO v -retrieveIO x = awaitIO =<< withDefaultResourceManager (retrieve x) +retrieveIO x = awaitIO =<< withDefaultAsyncManager (retrieve x) class IsRetrievable v o => IsObservable v o | o -> v where observe :: o -> (ObservableMessage v -> IO ()) -> IO Disposable @@ -78,8 +78,8 @@ class IsRetrievable v o => IsObservable v o | o -> v where mapObservable :: (v -> a) -> o -> Observable a mapObservable f = Observable . MappedObservable f --- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. -observeWhile :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable +-- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `AsyncManager` is disposed. +observeWhile :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m Disposable observeWhile observable callback = do --disposeVar <- liftIO $ newTVarIO False @@ -104,7 +104,7 @@ observeWhile observable callback = do -- | Observe until the callback returns `False`. The callback will also be unsubscribed when the `ResourceManager` is disposed. -observeWhile_ :: (IsObservable v o, HasResourceManager m) => o -> (ObservableMessage v -> IO Bool) -> m () +observeWhile_ :: (IsObservable v o, MonadAsync m) => o -> (ObservableMessage v -> IO Bool) -> m () observeWhile_ observable callback = -- The disposable is already attached to the resource manager, so voiding it is safe. void $ observeWhile observable callback @@ -170,7 +170,7 @@ instance IsObservable r (BindObservable r) where observe :: BindObservable r -> (ObservableMessage r -> IO ()) -> IO Disposable observe (BindObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. - resourceManager <- newResourceManager unlimitedResourceManagerConfiguration + resourceManager <- newResourceManager isDisposingVar <- newTVarIO False disposableVar <- newTMVarIO noDisposable @@ -238,7 +238,7 @@ instance IsObservable r (CatchObservable e r) where observe :: CatchObservable e r -> (ObservableMessage r -> IO ()) -> IO Disposable observe (CatchObservable fx fn) callback = do -- Create a resource manager to ensure all subscriptions are cleaned up when disposing. - resourceManager <- newResourceManager unlimitedResourceManagerConfiguration + resourceManager <- newResourceManager isDisposingVar <- newTVarIO False disposableVar <- newTMVarIO noDisposable @@ -382,7 +382,7 @@ mergeObservable :: (IsObservable v0 o0, IsObservable v1 o1) => (v0 -> v1 -> r) - mergeObservable merge x y = Observable $ MergedObservable merge x y data FnObservable v = FnObservable { - retrieveFn :: forall m. HasResourceManager m => m (Task v), + retrieveFn :: forall m. MonadAsync m => m (Task v), observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable } instance IsRetrievable v (FnObservable v) where @@ -397,7 +397,7 @@ instance IsObservable v (FnObservable v) where -- | Implement an Observable by directly providing functions for `retrieve` and `subscribe`. fnObservable :: ((ObservableMessage v -> IO ()) -> IO Disposable) - -> (forall m. HasResourceManager m => m (Task v)) + -> (forall m. MonadAsync m => m (Task v)) -> Observable v fnObservable observeFn retrieveFn = toObservable FnObservable{observeFn, retrieveFn} @@ -408,7 +408,7 @@ synchronousFnObservable -> Observable v synchronousFnObservable observeFn synchronousRetrieveFn = fnObservable observeFn retrieveFn where - retrieveFn :: (forall m. HasResourceManager m => m (Task v)) + retrieveFn :: (forall m. MonadAsync m => m (Task v)) retrieveFn = liftIO $ successfulTask <$> synchronousRetrieveFn diff --git a/test/Quasar/AsyncSpec.hs b/test/Quasar/AsyncSpec.hs index 098af3ff335240841978f106de45f3b1a2a769ce..0ad11ccf970ee02b2503117526d0311555f39a55 100644 --- a/test/Quasar/AsyncSpec.hs +++ b/test/Quasar/AsyncSpec.hs @@ -22,46 +22,46 @@ spec = parallel $ do describe "AsyncIO" $ do it "binds pure operations" $ do - withDefaultResourceManager (pure () >>= \() -> pure ()) + withDefaultAsyncManager (pure () >>= \() -> pure ()) it "binds IO actions" $ do m1 <- newEmptyMVar m2 <- newEmptyMVar - withDefaultResourceManager (liftIO (putMVar m1 ()) >>= \() -> liftIO (putMVar m2 ())) + withDefaultAsyncManager (liftIO (putMVar m1 ()) >>= \() -> liftIO (putMVar m2 ())) tryTakeMVar m1 `shouldReturn` Just () tryTakeMVar m2 `shouldReturn` Just () it "can continue after awaiting an already finished operation" $ do - withDefaultResourceManager (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42 + withDefaultAsyncManager (await =<< async (pure 42 :: AsyncIO Int)) `shouldReturn` 42 it "can fmap the result of an already finished async" $ do avar <- newAsyncVar :: IO (AsyncVar ()) putAsyncVar_ avar () - withDefaultResourceManager (id <$> await avar) + withDefaultAsyncManager (id <$> await avar) it "can fmap the result of an async that is completed later" $ do avar <- newAsyncVar :: IO (AsyncVar ()) void $ forkIO $ do threadDelay 100000 putAsyncVar_ avar () - withDefaultResourceManager (id <$> await avar) + withDefaultAsyncManager (id <$> await avar) it "can bind the result of an already finished async" $ do avar <- newAsyncVar :: IO (AsyncVar ()) putAsyncVar_ avar () - withDefaultResourceManager (await avar >>= pure) + withDefaultAsyncManager (await avar >>= pure) it "can bind the result of an async that is completed later" $ do avar <- newAsyncVar :: IO (AsyncVar ()) void $ forkIO $ do threadDelay 100000 putAsyncVar_ avar () - withDefaultResourceManager (await avar >>= pure) + withDefaultAsyncManager (await avar >>= pure) it "can terminate when encountering an asynchronous exception" $ do never <- newAsyncVar :: IO (AsyncVar ()) - result <- timeout 100000 $ withDefaultResourceManager $ + result <- timeout 100000 $ withDefaultAsyncManager $ -- Use bind to create an AsyncIOPlumbing, which is the interesting case that uses `uninterruptibleMask` when run await never >>= pure result `shouldBe` Nothing