From 7e01fd0cdc4e15b15fa7eef11290ab7595907a17 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Mon, 30 Aug 2021 02:25:15 +0200 Subject: [PATCH] Update quasar and change affected code --- flake.lock | 6 +- src/Quasar/Network/Multiplexer.hs | 5 + src/Quasar/Network/Runtime.hs | 8 +- src/Quasar/Network/Runtime/Observable.hs | 14 +-- src/Quasar/Network/TH.hs | 4 +- test/Quasar/NetworkSpec.hs | 111 ++++++++++++----------- 6 files changed, 79 insertions(+), 69 deletions(-) diff --git a/flake.lock b/flake.lock index c5dd0da..49b7e0a 100644 --- a/flake.lock +++ b/flake.lock @@ -21,11 +21,11 @@ }, "locked": { "host": "git.c3pb.de", - "lastModified": 1629328844, - "narHash": "sha256-4AiAjuSuTneWidipxxTu60xqztbum8OuTuKI9pWpUsQ=", + "lastModified": 1630286998, + "narHash": "sha256-2rGt7QPAHXuHP8mn/vxzXJotrGcv93jyQTZTPrwZ1w8=", "owner": "jens", "repo": "quasar", - "rev": "67efa0e198ac78aaaf308c2067019574548acf7c", + "rev": "499e025611d10540dbebf0e2732a0316a6e111bd", "type": "gitlab" }, "original": { diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs index 445f512..999963a 100644 --- a/src/Quasar/Network/Multiplexer.hs +++ b/src/Quasar/Network/Multiplexer.hs @@ -107,12 +107,17 @@ instance Exception MultiplexerException data Channel = Channel { channelId :: ChannelId, + resourceManager :: ResourceManager, worker :: MultiplexerWorker, stateMVar :: MVar ChannelState, sendStateMVar :: MVar ChannelSendState, receiveStateMVar :: MVar ChannelReceiveState, handlerAtVar :: AtVar InternalChannelMessageHandler } + +instance HasResourceManager Channel where + getResourceManager channel = channel.resourceManager + data ChannelState = ChannelState { connectionState :: ChannelConnectivity, children :: [Channel] diff --git a/src/Quasar/Network/Runtime.hs b/src/Quasar/Network/Runtime.hs index c94ef6e..9a57e1b 100644 --- a/src/Quasar/Network/Runtime.hs +++ b/src/Quasar/Network/Runtime.hs @@ -47,8 +47,9 @@ import Data.Binary (Binary, encode, decodeOrFail) import qualified Data.ByteString.Lazy as BSL import qualified Data.HashMap.Strict as HM import qualified Network.Socket as Socket +import Quasar.Async import Quasar.Awaitable -import Quasar.Core +import Quasar.Disposable import Quasar.Network.Connection import Quasar.Network.Multiplexer import Quasar.Prelude @@ -65,7 +66,7 @@ type ProtocolResponseWrapper p = (MessageId, ProtocolResponse p) class RpcProtocol p => HasProtocolImpl p where type ProtocolImpl p - handleRequest :: HasResourceManager m => ProtocolImpl p -> Channel -> ProtocolRequest p -> [Channel] -> m (Maybe (Task (ProtocolResponse p))) + handleRequest :: MonadAsync m => ProtocolImpl p -> Channel -> ProtocolRequest p -> [Channel] -> m (Maybe (Task (ProtocolResponse p))) data Client p = Client { @@ -131,8 +132,7 @@ serverHandleChannelMessage protocolImpl channel resources msg = case decodeOrFai where serverHandleChannelRequest :: [Channel] -> ProtocolRequest p -> IO () serverHandleChannelRequest channels req = do - -- TODO resource manager should belong to the current channel/api - withDefaultResourceManager $ + onResourceManager channel $ handleRequest @p protocolImpl channel req channels >>= \case Nothing -> pure () Just task -> do diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index 3a4758c..18e48e5 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -5,8 +5,9 @@ module Quasar.Network.Runtime.Observable ( ) where import Data.Binary (Binary) +import Quasar.Async import Quasar.Awaitable -import Quasar.Core +import Quasar.Disposable import Quasar.Network.Exception import Quasar.Network.Runtime import Quasar.Observable @@ -44,11 +45,10 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv stream <- startObserveRequest streamSetHandler stream (callback . unpackObservableMessage) synchronousDisposable $ streamClose stream - retrieveFn :: forall m. HasResourceManager m => m (Task v) + retrieveFn :: forall m. MonadAsync m => m (Task v) retrieveFn = toTask <$> startRetrieveRequest -observeToStream :: (Binary v, HasResourceManager m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () -observeToStream observable stream = do - _disposable <- liftIO $ observe observable $ streamSend stream . packObservableMessage - -- TODO: dispose when the stream is closed - pure () +observeToStream :: (Binary v, MonadAsync m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () +observeToStream observable stream = + asyncObserve_ observable \msg -> + streamSend stream $ packObservableMessage msg diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index 42a83d7..3ac952f 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -24,8 +24,8 @@ import Data.Maybe (isJust, isNothing) import GHC.Records.Compat (HasField) import Language.Haskell.TH hiding (interruptible) import Language.Haskell.TH.Syntax +import Quasar.Async import Quasar.Awaitable -import Quasar.Core import Quasar.Network.Multiplexer import Quasar.Network.Runtime import Quasar.Network.Runtime.Observable @@ -559,7 +559,7 @@ createResource RequestCreateChannel channelE = [|pure $channelE|] createResource (RequestCreateStream up down) channelE = [|newStream $channelE|] implResultType :: Request -> Q Type -implResultType req = [t|forall m. HasResourceManager m => m $(resultType)|] +implResultType req = [t|forall m. MonadAsync m => m $(resultType)|] where resultType = case req.mResponse of Nothing -> [t|()|] diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs index daf81e5..465d1d3 100644 --- a/test/Quasar/NetworkSpec.hs +++ b/test/Quasar/NetworkSpec.hs @@ -16,8 +16,9 @@ import Control.Concurrent.STM import Control.Exception (toException) import Control.Monad.IO.Class (MonadIO, liftIO) import Quasar.Prelude +import Quasar.Async import Quasar.Awaitable -import Quasar.Core +import Quasar.Disposable import Quasar.Network import Quasar.Network.Runtime (withStandaloneClient) import Quasar.Network.TH (makeRpc) @@ -26,8 +27,6 @@ import Test.Hspec import Test.QuickCheck import Test.QuickCheck.Monadic -shouldReturnAsync :: (HasCallStack, IsAwaitable r a, Show r, Eq r) => AsyncIO a -> r -> AsyncIO () -action `shouldReturnAsync` expected = action >>= await >>= liftIO . (`shouldBe` expected) $(makeRpc $ rpcApi "Example" $ do rpcFunction "fixedHandler42" $ do @@ -96,10 +95,10 @@ spec = parallel $ do describe "Example" $ do it "works" $ do withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do - (awaitIO =<< fixedHandler42 client 5) `shouldReturn` False - (awaitIO =<< fixedHandler42 client 42) `shouldReturn` True - (awaitIO =<< fixedHandlerInc client 41) `shouldReturn` 42 - (awaitIO =<< multiArgs client 10 3 False) `shouldReturn` (13, True) + (await =<< fixedHandler42 client 5) `shouldReturn` False + (await =<< fixedHandler42 client 42) `shouldReturn` True + (await =<< fixedHandlerInc client 41) `shouldReturn` 42 + (await =<< multiArgs client 10 3 False) `shouldReturn` (13, True) noResponse client 1337 noNothing client @@ -136,40 +135,45 @@ spec = parallel $ do var <- newObservableVar 41 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do + resultVar <- newTVarIO ObservableLoading observable <- intObservable client -- Change the value before calling `observe` setObservableVar var 42 - void $ observe observable $ atomically . writeTVar resultVar - join $ atomically $ readTVar resultVar >>= - \case - ObservableUpdate x -> pure $ x `shouldBe` 42 - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex + withOnResourceManager do + asyncObserve_ observable $ liftIO . atomically . writeTVar resultVar + + liftIO $ join $ atomically $ readTVar resultVar >>= + \case + ObservableUpdate x -> pure $ x `shouldBe` 42 + ObservableLoading -> retry + ObservableNotAvailable ex -> pure $ throwIO ex it "receives continuous updates when observing" $ do var <- newObservableVar 42 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading observable <- intObservable client - void $ observe observable $ atomically . writeTVar resultVar - let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= - \case - -- Send and receive are running asynchronously, so this retries until the expected value is received. - -- Blocks forever if the wrong or no value is received. - ObservableUpdate x -> if (x == expected) then pure (pure ()) else retry - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex - - latestShouldBe 42 - setObservableVar var 13 - latestShouldBe 13 - setObservableVar var (-1) - latestShouldBe (-1) - setObservableVar var 42 - latestShouldBe 42 + withOnResourceManager do + void $ asyncObserve observable $ liftIO . atomically . writeTVar resultVar + + let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= + \case + -- Send and receive are running asynchronously, so this retries until the expected value is received. + -- Blocks forever if the wrong or no value is received. + ObservableUpdate x -> if (x == expected) then pure (pure ()) else retry + ObservableLoading -> retry + ObservableNotAvailable ex -> pure $ throwIO ex + + latestShouldBe 42 + setObservableVar var 13 + latestShouldBe 13 + setObservableVar var (-1) + latestShouldBe (-1) + setObservableVar var 42 + latestShouldBe 42 it "receives no further updates after disposing the callback registration" $ do pendingWith "not implemented" @@ -178,27 +182,28 @@ spec = parallel $ do withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading observable <- intObservable client - disposable <- observe observable $ atomically . writeTVar resultVar - let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= - \case - -- Send and receive are running asynchronously, so this retries until the expected value is received. - -- Blocks forever if the wrong or no value is received. - ObservableUpdate x -> if (x < 0) - then pure (fail "received a message after unsubscribing") - else if (x == expected) then pure (pure ()) else retry - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex - - latestShouldBe 42 - setObservableVar var 13 - latestShouldBe 13 - setObservableVar var 42 - latestShouldBe 42 - - disposeIO disposable - - setObservableVar var (-1) - threadDelay 10000 - - latestShouldBe 42 - + withOnResourceManager do + disposable <- asyncObserve observable $ liftIO . atomically . writeTVar resultVar + + let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= + \case + -- Send and receive are running asynchronously, so this retries until the expected value is received. + -- Blocks forever if the wrong or no value is received. + ObservableUpdate x -> if (x < 0) + then pure (fail "received a message after unsubscribing") + else if (x == expected) then pure (pure ()) else retry + ObservableLoading -> retry + ObservableNotAvailable ex -> pure $ throwIO ex + + latestShouldBe 42 + setObservableVar var 13 + latestShouldBe 13 + setObservableVar var 42 + latestShouldBe 42 + + disposeAndAwait disposable + + setObservableVar var (-1) + liftIO $ threadDelay 10000 + + latestShouldBe 42 -- GitLab