diff --git a/src/Quasar/Core.hs b/src/Quasar/Core.hs index b5e56e4dd8a60d1cbaedbaf1f4e4e2c25faa5775..4d44ba65e3be2a7632f3c1ed681c96b0fe42f5b9 100644 --- a/src/Quasar/Core.hs +++ b/src/Quasar/Core.hs @@ -23,120 +23,135 @@ import Quasar.Prelude -- * AsyncIO data AsyncIO r - = AsyncIOSuccess r - | AsyncIOFailure SomeException + = AsyncIOCompleted (Either SomeException r) | AsyncIOIO (IO r) - | AsyncIOAsync (Awaitable r) - | AsyncIOPlumbing (MaskingState -> CancellationToken -> IO (AsyncIO r)) + | AsyncIOAwait (Awaitable r) + | forall a. AsyncIOBind (AsyncIO a) (a -> AsyncIO r) + | forall e. Exception e => AsyncIOCatch (AsyncIO r) (e -> AsyncIO r) + | r ~ Pool => AsyncIOAskPool instance Functor AsyncIO where - fmap fn (AsyncIOSuccess x) = AsyncIOSuccess (fn x) - fmap _ (AsyncIOFailure x) = AsyncIOFailure x + fmap fn (AsyncIOCompleted x) = AsyncIOCompleted (fn <$> x) fmap fn (AsyncIOIO x) = AsyncIOIO (fn <$> x) - fmap fn (AsyncIOAsync x) = AsyncIOAsync (fn <$> x) - fmap fn (AsyncIOPlumbing x) = mapPlumbing x (fmap (fmap fn)) + fmap fn (AsyncIOAwait x) = AsyncIOAwait (fn <$> x) + fmap fn (AsyncIOBind x y) = AsyncIOBind x (fn <<$>> y) + fmap fn (AsyncIOCatch x y) = AsyncIOCatch (fn <$> x) (fn <<$>> y) + fmap fn AsyncIOAskPool = AsyncIOBind AsyncIOAskPool (pure . fn) instance Applicative AsyncIO where - pure = AsyncIOSuccess + pure = AsyncIOCompleted . Right (<*>) pf px = pf >>= \f -> f <$> px liftA2 f px py = px >>= \x -> f x <$> py instance Monad AsyncIO where (>>=) :: forall a b. AsyncIO a -> (a -> AsyncIO b) -> AsyncIO b - (>>=) (AsyncIOSuccess x) fn = fn x - (>>=) (AsyncIOFailure x) _ = AsyncIOFailure x - (>>=) (AsyncIOIO x) fn = AsyncIOPlumbing $ \maskingState cancellationToken -> do - -- TODO masking and cancellation - either AsyncIOFailure fn <$> try x - (>>=) (AsyncIOAsync x) fn = bindAsync x fn - (>>=) (AsyncIOPlumbing x) fn = mapPlumbing x (fmap (>>= fn)) + x >>= fn = AsyncIOBind x fn instance MonadIO AsyncIO where liftIO = AsyncIOIO instance MonadThrow AsyncIO where - throwM = AsyncIOFailure . toException + throwM = AsyncIOCompleted . Left . toException instance MonadCatch AsyncIO where catch :: Exception e => AsyncIO a -> (e -> AsyncIO a) -> AsyncIO a - catch x@(AsyncIOSuccess _) _ = x - catch x@(AsyncIOFailure ex) handler = maybe x handler (fromException ex) - catch (AsyncIOIO x) handler = AsyncIOIO (try x) >>= handleEither handler - catch (AsyncIOAsync x) handler = bindAsyncCatch x (handleEither handler) - catch (AsyncIOPlumbing x) handler = mapPlumbing x (fmap (`catch` handler)) - -handleEither :: Exception e => (e -> AsyncIO a) -> Either SomeException a -> AsyncIO a -handleEither handler (Left ex) = maybe (AsyncIOFailure ex) handler (fromException ex) -handleEither _ (Right r) = pure r - -mapPlumbing :: (MaskingState -> CancellationToken -> IO (AsyncIO a)) -> (IO (AsyncIO a) -> IO (AsyncIO b)) -> AsyncIO b -mapPlumbing plumbing fn = AsyncIOPlumbing $ \maskingState cancellationToken -> fn (plumbing maskingState cancellationToken) - -bindAsync :: forall a b. Awaitable a -> (a -> AsyncIO b) -> AsyncIO b -bindAsync x fn = bindAsyncCatch x (either AsyncIOFailure fn) - -bindAsyncCatch :: forall a b. Awaitable a -> (Either SomeException a -> AsyncIO b) -> AsyncIO b -bindAsyncCatch x fn = undefined -- AsyncIOPlumbing $ \maskingState cancellationToken -> do - --var <- newAsyncVar - --disposableMVar <- newEmptyMVar - --go maskingState cancellationToken var disposableMVar - --where - -- go maskingState cancellationToken var disposableMVar = do - -- disposable <- onResult x (failAsyncVar_ var) $ \x -> do - -- (putAsyncIOResult . fn) x - -- -- TODO update mvar and dispose when completed - -- putMVar disposableMVar disposable - -- pure $ awaitUnlessCancellationRequested cancellationToken var - -- where - -- put = putAsyncVarEither var - -- putAsyncIOResult :: AsyncIO b -> IO () - -- putAsyncIOResult (AsyncIOSuccess x) = put (Right x) - -- putAsyncIOResult (AsyncIOFailure x) = put (Left x) - -- putAsyncIOResult (AsyncIOIO x) = try x >>= put - -- putAsyncIOResult (AsyncIOAsync x) = onResult_ x (put . Left) put - -- putAsyncIOResult (AsyncIOPlumbing x) = x maskingState cancellationToken >>= putAsyncIOResult - + catch = AsyncIOCatch -- | Run the synchronous part of an `AsyncIO` and then return an `Awaitable` that can be used to wait for completion of the synchronous part. async :: AsyncIO r -> AsyncIO (Awaitable r) -async (AsyncIOSuccess x) = pure $ successfulAwaitable x -async (AsyncIOFailure x) = pure $ failedAwaitable x -async (AsyncIOIO x) = liftIO $ either failedAwaitable successfulAwaitable <$> try x -async (AsyncIOAsync x) = pure x -- TODO caching -async (AsyncIOPlumbing x) = mapPlumbing x (fmap async) +async (AsyncIOCompleted x) = pure $ completedAwaitable x +async (AsyncIOAwait x) = pure x +async x = do + pool <- askPool + liftIO . atomically $ queueWork pool x + +askPool :: AsyncIO Pool +askPool = AsyncIOAskPool await :: IsAwaitable r a => a -> AsyncIO r -await = AsyncIOAsync . toAwaitable +await = AsyncIOAwait . toAwaitable -- | Run an `AsyncIO` to completion and return the result. runAsyncIO :: AsyncIO r -> IO r -runAsyncIO (AsyncIOSuccess x) = pure x -runAsyncIO (AsyncIOFailure x) = throwIO x -runAsyncIO (AsyncIOIO x) = x -runAsyncIO (AsyncIOAsync x) = either throwIO pure =<< atomically (awaitSTM x) -runAsyncIO (AsyncIOPlumbing x) = do - maskingState <- getMaskingState - withCancellationToken $ x maskingState >=> runAsyncIO +runAsyncIO x = withDefaultPool $ \pool -> runAsyncIOWithPool pool x + +runAsyncIOWithPool :: Pool -> AsyncIO r -> IO r +runAsyncIOWithPool pool x = do + stepResult <- stepAsyncIO pool x + case stepResult of + Left awaitable -> either throwIO pure =<< atomically (awaitSTM awaitable) + Right result -> pure result + + +stepAsyncIO :: Pool -> AsyncIO r -> IO (Either (Awaitable r) r) +stepAsyncIO pool = go + where + go :: AsyncIO r -> IO (Either (Awaitable r) r) + go (AsyncIOCompleted x) = Right <$> either throwIO pure x + go (AsyncIOIO x) = Right <$> x + go (AsyncIOAwait x) = pure $ Left x + go (AsyncIOBind x fn) = do + go x >>= \case + Left awaitable -> bindAwaitable awaitable (either throwM fn) + Right r -> go (fn r) + go (AsyncIOCatch x handler) = do + try (go x) >>= \case + Left ex -> go (handler ex) + Right (Left awaitable) -> bindAwaitable awaitable (either (handleSomeException handler) pure) + Right (Right r) -> pure $ Right r + go AsyncIOAskPool = pure $ Right pool + + bindAwaitable :: Awaitable a -> (Either SomeException a -> AsyncIO r) -> IO (Either (Awaitable r) r) + bindAwaitable input work = fmap Left . atomically $ queueBlockedWork pool input work + +handleSomeException :: (Exception e, MonadThrow m) => (e -> m a) -> SomeException -> m a +handleSomeException handler ex = maybe (throwM ex) handler (fromException ex) awaitResult :: AsyncIO (Awaitable r) -> AsyncIO r awaitResult = (await =<<) +-- TODO rename +-- AsyncIOPool +-- AsyncPool +-- ThreadPool +-- AsyncIORuntime +-- AsyncIOContext +data Pool = Pool { + queue :: TVar [AsyncQueueItem] +} + +data AsyncQueueItem = forall a. AsyncQueueItem (Awaitable a) (Either SomeException a -> AsyncIO ()) +withPool :: (Pool -> IO a) -> IO a +withPool = undefined --- ** Forking asyncs +withDefaultPool :: (Pool -> IO a) -> IO a +withDefaultPool = (=<< atomically defaultPool) --- TODO ---class IsAsyncForkable m where --- asyncThread :: m r -> AsyncIO r +defaultPool :: STM Pool +defaultPool = do + queue <- newTVar [] + pure Pool { + queue + } +queueWork :: Pool -> AsyncIO r -> STM (Awaitable r) +queueWork pool work = queueBlockedWork pool (successfulAwaitable ()) (const work) +queueBlockedWork :: Pool -> Awaitable a -> (Either SomeException a -> AsyncIO r) -> STM (Awaitable r) +queueBlockedWork pool input work = do + resultVar <- newAsyncVarSTM + -- TODO masking state + let actualWork = try . work >=> putAsyncVarEither_ resultVar + undefined + pure $ toAwaitable resultVar -- * Awaiting multiple asyncs awaitEither :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> AsyncIO (Either ra rb) -awaitEither x y = AsyncIOPlumbing $ \_ _ -> AsyncIOAsync <$> awaitEitherPlumbing x y +awaitEither x y = await =<< liftIO (awaitEitherPlumbing x y) awaitEitherPlumbing :: (IsAwaitable ra a , IsAwaitable rb b) => a -> b -> IO (Awaitable (Either ra rb)) awaitEitherPlumbing x y = awaitableFromSTM $ peekEitherSTM x y