diff --git a/flake.lock b/flake.lock index c5dd0da8c3cc576d099f6fb0f61f012b278cf526..49b7e0a1dd69a6575e7cbd14adb14926c33dc199 100644 --- a/flake.lock +++ b/flake.lock @@ -21,11 +21,11 @@ }, "locked": { "host": "git.c3pb.de", - "lastModified": 1629328844, - "narHash": "sha256-4AiAjuSuTneWidipxxTu60xqztbum8OuTuKI9pWpUsQ=", + "lastModified": 1630286998, + "narHash": "sha256-2rGt7QPAHXuHP8mn/vxzXJotrGcv93jyQTZTPrwZ1w8=", "owner": "jens", "repo": "quasar", - "rev": "67efa0e198ac78aaaf308c2067019574548acf7c", + "rev": "499e025611d10540dbebf0e2732a0316a6e111bd", "type": "gitlab" }, "original": { diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs index 445f51264f02a3e7ff0093cc9d42f419c57d0cac..999963a720f8d0c402a4999ece98183585ab4a8a 100644 --- a/src/Quasar/Network/Multiplexer.hs +++ b/src/Quasar/Network/Multiplexer.hs @@ -107,12 +107,17 @@ instance Exception MultiplexerException data Channel = Channel { channelId :: ChannelId, + resourceManager :: ResourceManager, worker :: MultiplexerWorker, stateMVar :: MVar ChannelState, sendStateMVar :: MVar ChannelSendState, receiveStateMVar :: MVar ChannelReceiveState, handlerAtVar :: AtVar InternalChannelMessageHandler } + +instance HasResourceManager Channel where + getResourceManager channel = channel.resourceManager + data ChannelState = ChannelState { connectionState :: ChannelConnectivity, children :: [Channel] diff --git a/src/Quasar/Network/Runtime.hs b/src/Quasar/Network/Runtime.hs index c94ef6e826bc1be7f2963531d83442b55eff4459..9a57e1b003bdce9d6e09e9e69312fe1765e2ae0e 100644 --- a/src/Quasar/Network/Runtime.hs +++ b/src/Quasar/Network/Runtime.hs @@ -47,8 +47,9 @@ import Data.Binary (Binary, encode, decodeOrFail) import qualified Data.ByteString.Lazy as BSL import qualified Data.HashMap.Strict as HM import qualified Network.Socket as Socket +import Quasar.Async import Quasar.Awaitable -import Quasar.Core +import Quasar.Disposable import Quasar.Network.Connection import Quasar.Network.Multiplexer import Quasar.Prelude @@ -65,7 +66,7 @@ type ProtocolResponseWrapper p = (MessageId, ProtocolResponse p) class RpcProtocol p => HasProtocolImpl p where type ProtocolImpl p - handleRequest :: HasResourceManager m => ProtocolImpl p -> Channel -> ProtocolRequest p -> [Channel] -> m (Maybe (Task (ProtocolResponse p))) + handleRequest :: MonadAsync m => ProtocolImpl p -> Channel -> ProtocolRequest p -> [Channel] -> m (Maybe (Task (ProtocolResponse p))) data Client p = Client { @@ -131,8 +132,7 @@ serverHandleChannelMessage protocolImpl channel resources msg = case decodeOrFai where serverHandleChannelRequest :: [Channel] -> ProtocolRequest p -> IO () serverHandleChannelRequest channels req = do - -- TODO resource manager should belong to the current channel/api - withDefaultResourceManager $ + onResourceManager channel $ handleRequest @p protocolImpl channel req channels >>= \case Nothing -> pure () Just task -> do diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index 3a4758c7d6eb717a415c4d008eebcb69d0358620..18e48e5af654ea048d5269a8a5535c6d9fc29317 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -5,8 +5,9 @@ module Quasar.Network.Runtime.Observable ( ) where import Data.Binary (Binary) +import Quasar.Async import Quasar.Awaitable -import Quasar.Core +import Quasar.Disposable import Quasar.Network.Exception import Quasar.Network.Runtime import Quasar.Observable @@ -44,11 +45,10 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv stream <- startObserveRequest streamSetHandler stream (callback . unpackObservableMessage) synchronousDisposable $ streamClose stream - retrieveFn :: forall m. HasResourceManager m => m (Task v) + retrieveFn :: forall m. MonadAsync m => m (Task v) retrieveFn = toTask <$> startRetrieveRequest -observeToStream :: (Binary v, HasResourceManager m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () -observeToStream observable stream = do - _disposable <- liftIO $ observe observable $ streamSend stream . packObservableMessage - -- TODO: dispose when the stream is closed - pure () +observeToStream :: (Binary v, MonadAsync m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () +observeToStream observable stream = + asyncObserve_ observable \msg -> + streamSend stream $ packObservableMessage msg diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index 42a83d73c1ae9d8aa9179fab85781750baf782b1..3ac952fff111d86c0529fd8881d1bd851478a5d8 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -24,8 +24,8 @@ import Data.Maybe (isJust, isNothing) import GHC.Records.Compat (HasField) import Language.Haskell.TH hiding (interruptible) import Language.Haskell.TH.Syntax +import Quasar.Async import Quasar.Awaitable -import Quasar.Core import Quasar.Network.Multiplexer import Quasar.Network.Runtime import Quasar.Network.Runtime.Observable @@ -559,7 +559,7 @@ createResource RequestCreateChannel channelE = [|pure $channelE|] createResource (RequestCreateStream up down) channelE = [|newStream $channelE|] implResultType :: Request -> Q Type -implResultType req = [t|forall m. HasResourceManager m => m $(resultType)|] +implResultType req = [t|forall m. MonadAsync m => m $(resultType)|] where resultType = case req.mResponse of Nothing -> [t|()|] diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs index daf81e513778deb9f9766b87ccabd9116bce5c59..465d1d34ebc0c10f69c8e94345e9a331b11c5b85 100644 --- a/test/Quasar/NetworkSpec.hs +++ b/test/Quasar/NetworkSpec.hs @@ -16,8 +16,9 @@ import Control.Concurrent.STM import Control.Exception (toException) import Control.Monad.IO.Class (MonadIO, liftIO) import Quasar.Prelude +import Quasar.Async import Quasar.Awaitable -import Quasar.Core +import Quasar.Disposable import Quasar.Network import Quasar.Network.Runtime (withStandaloneClient) import Quasar.Network.TH (makeRpc) @@ -26,8 +27,6 @@ import Test.Hspec import Test.QuickCheck import Test.QuickCheck.Monadic -shouldReturnAsync :: (HasCallStack, IsAwaitable r a, Show r, Eq r) => AsyncIO a -> r -> AsyncIO () -action `shouldReturnAsync` expected = action >>= await >>= liftIO . (`shouldBe` expected) $(makeRpc $ rpcApi "Example" $ do rpcFunction "fixedHandler42" $ do @@ -96,10 +95,10 @@ spec = parallel $ do describe "Example" $ do it "works" $ do withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do - (awaitIO =<< fixedHandler42 client 5) `shouldReturn` False - (awaitIO =<< fixedHandler42 client 42) `shouldReturn` True - (awaitIO =<< fixedHandlerInc client 41) `shouldReturn` 42 - (awaitIO =<< multiArgs client 10 3 False) `shouldReturn` (13, True) + (await =<< fixedHandler42 client 5) `shouldReturn` False + (await =<< fixedHandler42 client 42) `shouldReturn` True + (await =<< fixedHandlerInc client 41) `shouldReturn` 42 + (await =<< multiArgs client 10 3 False) `shouldReturn` (13, True) noResponse client 1337 noNothing client @@ -136,40 +135,45 @@ spec = parallel $ do var <- newObservableVar 41 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do + resultVar <- newTVarIO ObservableLoading observable <- intObservable client -- Change the value before calling `observe` setObservableVar var 42 - void $ observe observable $ atomically . writeTVar resultVar - join $ atomically $ readTVar resultVar >>= - \case - ObservableUpdate x -> pure $ x `shouldBe` 42 - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex + withOnResourceManager do + asyncObserve_ observable $ liftIO . atomically . writeTVar resultVar + + liftIO $ join $ atomically $ readTVar resultVar >>= + \case + ObservableUpdate x -> pure $ x `shouldBe` 42 + ObservableLoading -> retry + ObservableNotAvailable ex -> pure $ throwIO ex it "receives continuous updates when observing" $ do var <- newObservableVar 42 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading observable <- intObservable client - void $ observe observable $ atomically . writeTVar resultVar - let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= - \case - -- Send and receive are running asynchronously, so this retries until the expected value is received. - -- Blocks forever if the wrong or no value is received. - ObservableUpdate x -> if (x == expected) then pure (pure ()) else retry - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex - - latestShouldBe 42 - setObservableVar var 13 - latestShouldBe 13 - setObservableVar var (-1) - latestShouldBe (-1) - setObservableVar var 42 - latestShouldBe 42 + withOnResourceManager do + void $ asyncObserve observable $ liftIO . atomically . writeTVar resultVar + + let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= + \case + -- Send and receive are running asynchronously, so this retries until the expected value is received. + -- Blocks forever if the wrong or no value is received. + ObservableUpdate x -> if (x == expected) then pure (pure ()) else retry + ObservableLoading -> retry + ObservableNotAvailable ex -> pure $ throwIO ex + + latestShouldBe 42 + setObservableVar var 13 + latestShouldBe 13 + setObservableVar var (-1) + latestShouldBe (-1) + setObservableVar var 42 + latestShouldBe 42 it "receives no further updates after disposing the callback registration" $ do pendingWith "not implemented" @@ -178,27 +182,28 @@ spec = parallel $ do withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading observable <- intObservable client - disposable <- observe observable $ atomically . writeTVar resultVar - let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= - \case - -- Send and receive are running asynchronously, so this retries until the expected value is received. - -- Blocks forever if the wrong or no value is received. - ObservableUpdate x -> if (x < 0) - then pure (fail "received a message after unsubscribing") - else if (x == expected) then pure (pure ()) else retry - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex - - latestShouldBe 42 - setObservableVar var 13 - latestShouldBe 13 - setObservableVar var 42 - latestShouldBe 42 - - disposeIO disposable - - setObservableVar var (-1) - threadDelay 10000 - - latestShouldBe 42 - + withOnResourceManager do + disposable <- asyncObserve observable $ liftIO . atomically . writeTVar resultVar + + let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= + \case + -- Send and receive are running asynchronously, so this retries until the expected value is received. + -- Blocks forever if the wrong or no value is received. + ObservableUpdate x -> if (x < 0) + then pure (fail "received a message after unsubscribing") + else if (x == expected) then pure (pure ()) else retry + ObservableLoading -> retry + ObservableNotAvailable ex -> pure $ throwIO ex + + latestShouldBe 42 + setObservableVar var 13 + latestShouldBe 13 + setObservableVar var 42 + latestShouldBe 42 + + disposeAndAwait disposable + + setObservableVar var (-1) + liftIO $ threadDelay 10000 + + latestShouldBe 42