diff --git a/flake.lock b/flake.lock index ebaa3e957730e4a5ebf5d7034a9dfa754b135c11..ab8fa280910e40e070c2769ea15db57799da55f1 100644 --- a/flake.lock +++ b/flake.lock @@ -21,11 +21,11 @@ }, "locked": { "host": "git.c3pb.de", - "lastModified": 1626105841, - "narHash": "sha256-od9bq8zu30xbbPckIeLW4d1cEOI1EQllOn3qW9qLr1Y=", + "lastModified": 1626923341, + "narHash": "sha256-CWvh6F6d1kEN6IpMvDBxSBNl4oJP2FhRGU5uGLwZSBw=", "owner": "jens", "repo": "quasar", - "rev": "fc85ec26151845dc9e972a7b91bd3828b0132baf", + "rev": "458784d70f664f3af9b98655505ca93e72610376", "type": "gitlab" }, "original": { diff --git a/src/Quasar/Network/Runtime.hs b/src/Quasar/Network/Runtime.hs index 036b3b0a6c7ce3e53cd06e01b7ca494f8fb3a21c..567b878c33c2a0c9b613211dab07e81eeedcfd5d 100644 --- a/src/Quasar/Network/Runtime.hs +++ b/src/Quasar/Network/Runtime.hs @@ -34,19 +34,20 @@ module Quasar.Network.Runtime ( RpcProtocol(..), HasProtocolImpl(..), clientSend, - clientRequestBlocking, + clientRequest, clientReportProtocolError, newStream, ) where import Control.Concurrent (forkFinally) import Control.Concurrent.Async (cancel, link, withAsync, mapConcurrently_) -import Control.Exception (SomeException, bracket, bracketOnError, bracketOnError, interruptible, mask_) +import Control.Exception (bracket, bracketOnError, bracketOnError, interruptible, mask_) import Control.Concurrent.MVar import Data.Binary (Binary, encode, decodeOrFail) import qualified Data.ByteString.Lazy as BSL import qualified Data.HashMap.Strict as HM import qualified Network.Socket as Socket +import Quasar.Core import Quasar.Network.Connection import Quasar.Network.Multiplexer import Quasar.Prelude @@ -78,29 +79,31 @@ emptyClientState = ClientState { callbacks = HM.empty } -clientSend :: RpcProtocol p => Client p -> MessageConfiguration -> ProtocolRequest p -> IO SentMessageResources -clientSend client config req = channelSend_ client.channel config (encode req) -clientRequestBlocking :: forall p. RpcProtocol p => Client p -> MessageConfiguration -> ProtocolRequest p -> IO (ProtocolResponse p, SentMessageResources) -clientRequestBlocking client config req = do - resultMVar <- newEmptyMVar - sentMessageResources <- channelSend client.channel config (encode req) $ \msgId -> +clientSend :: forall p m. (MonadIO m, RpcProtocol p) => Client p -> MessageConfiguration -> ProtocolRequest p -> m SentMessageResources +clientSend client config req = liftIO $ channelSend_ client.channel config (encode req) + +clientRequest :: forall p m a. (MonadIO m, RpcProtocol p) => Client p -> (ProtocolResponse p -> Maybe a) -> MessageConfiguration -> ProtocolRequest p -> m (Async a, SentMessageResources) +clientRequest client checkResponse config req = do + resultAsync <- newAsyncVar + sentMessageResources <- liftIO $ channelSend client.channel config (encode req) $ \msgId -> modifyMVar_ client.stateMVar $ - \state -> pure state{callbacks = HM.insert msgId (requestCompletedCallback resultMVar msgId) state.callbacks} - -- Block on resultMVar until the request completes - -- TODO: Future-based variant - result <- takeMVar resultMVar - pure (result, sentMessageResources) + \state -> pure state{callbacks = HM.insert msgId (requestCompletedCallback resultAsync msgId) state.callbacks} + pure (toAsync resultAsync, sentMessageResources) where - requestCompletedCallback :: MVar (ProtocolResponse p) -> MessageId -> ProtocolResponse p -> IO () - requestCompletedCallback resultMVar msgId response = do + requestCompletedCallback :: AsyncVar a -> MessageId -> ProtocolResponse p -> IO () + requestCompletedCallback resultAsync msgId response = do -- Remove callback modifyMVar_ client.stateMVar $ \state -> pure state{callbacks = HM.delete msgId state.callbacks} - putMVar resultMVar response + + case checkResponse response of + Nothing -> clientReportProtocolError client "Invalid response" + Just result -> putAsyncVar resultAsync result + clientHandleChannelMessage :: forall p. (RpcProtocol p) => Client p -> ReceivedMessageResources -> BSL.ByteString -> IO () clientHandleChannelMessage client resources msg = case decodeOrFail msg of Left (_, _, errMsg) -> channelReportProtocolError client.channel errMsg Right ("", _, resp) -> clientHandleResponse resp - Right (leftovers, _, _) -> channelReportProtocolError client.channel ("Response parser pureed unexpected leftovers: " <> show (BSL.length leftovers)) + Right (leftovers, _, _) -> channelReportProtocolError client.channel ("Response parser returned unexpected leftovers: " <> show (BSL.length leftovers)) where clientHandleResponse :: ProtocolResponseWrapper p -> IO () clientHandleResponse (requestId, resp) = do @@ -136,29 +139,29 @@ serverHandleChannelMessage protocolImpl channel resources msg = case decodeOrFai newtype Stream up down = Stream Channel -newStream :: Channel -> IO (Stream up down) -newStream = pure . Stream +newStream :: MonadIO m => Channel -> m (Stream up down) +newStream = liftIO . pure . Stream -streamSend :: Binary up => Stream up down -> up -> IO () -streamSend (Stream channel) value = channelSendSimple channel (encode value) +streamSend :: (Binary up, MonadIO m) => Stream up down -> up -> m () +streamSend (Stream channel) value = liftIO $ channelSendSimple channel (encode value) -streamSetHandler :: Binary down => Stream up down -> (down -> IO ()) -> IO () -streamSetHandler (Stream channel) handler = channelSetSimpleHandler channel handler +streamSetHandler :: (Binary down, MonadIO m) => Stream up down -> (down -> IO ()) -> m () +streamSetHandler (Stream channel) handler = liftIO $ channelSetSimpleHandler channel handler -streamClose :: Stream up down -> IO () -streamClose (Stream channel) = channelClose channel +streamClose :: MonadIO m => Stream up down -> m () +streamClose (Stream channel) = liftIO $ channelClose channel -- ** Running client and server -withClientTCP :: RpcProtocol p => Socket.HostName -> Socket.ServiceName -> (Client p -> IO a) -> IO a -withClientTCP host port = bracket (newClientTCP host port) clientClose +withClientTCP :: RpcProtocol p => Socket.HostName -> Socket.ServiceName -> (Client p -> AsyncIO a) -> IO a +withClientTCP host port = withClientBracket (newClientTCP host port) newClientTCP :: forall p. RpcProtocol p => Socket.HostName -> Socket.ServiceName -> IO (Client p) newClientTCP host port = newClient =<< connectTCP host port -withClientUnix :: RpcProtocol p => FilePath -> (Client p -> IO a) -> IO a -withClientUnix socketPath = bracket (newClientUnix socketPath) clientClose +withClientUnix :: RpcProtocol p => FilePath -> (Client p -> AsyncIO a) -> IO a +withClientUnix socketPath = withClientBracket (newClientUnix socketPath) newClientUnix :: RpcProtocol p => FilePath -> IO (Client p) newClientUnix socketPath = bracketOnError (Socket.socket Socket.AF_UNIX Socket.Stream Socket.defaultProtocol) Socket.close $ \sock -> do @@ -167,11 +170,14 @@ newClientUnix socketPath = bracketOnError (Socket.socket Socket.AF_UNIX Socket.S newClient sock -withClient :: forall p a b. (IsConnection a, RpcProtocol p) => a -> (Client p -> IO b) -> IO b -withClient x = bracket (newClient x) clientClose +withClient :: forall p a b. (IsConnection a, RpcProtocol p) => a -> (Client p -> AsyncIO b) -> IO b +withClient connection = withClientBracket (newClient connection) newClient :: forall p a. (IsConnection a, RpcProtocol p) => a -> IO (Client p) -newClient x = newChannelClient =<< newMultiplexer MultiplexerSideA (toSocketConnection x) +newClient connection = newChannelClient =<< newMultiplexer MultiplexerSideA (toSocketConnection connection) + +withClientBracket :: forall p a. (RpcProtocol p) => IO (Client p) -> (Client p -> AsyncIO a) -> IO a +withClientBracket createClient action = bracket createClient clientClose $ \client -> runAsyncIO (action client) newChannelClient :: RpcProtocol p => Channel -> IO (Client p) @@ -287,8 +293,8 @@ runServerHandler protocolImpl = runMultiplexer MultiplexerSideB registerChannelS registerChannelServerHandler channel = channelSetHandler channel (serverHandleChannelMessage @p protocolImpl channel) -withLocalClient :: forall p a. (RpcProtocol p, HasProtocolImpl p) => Server p -> ((Client p) -> IO a) -> IO a -withLocalClient server = bracket (newLocalClient server) clientClose +withLocalClient :: forall p a. (RpcProtocol p, HasProtocolImpl p) => Server p -> ((Client p) -> AsyncIO a) -> IO a +withLocalClient server action = bracket (newLocalClient server) clientClose $ \client -> runAsyncIO (action client) newLocalClient :: forall p. (RpcProtocol p, HasProtocolImpl p) => Server p -> IO (Client p) newLocalClient server = do @@ -300,5 +306,5 @@ newLocalClient server = do -- ** Test implementation -withStandaloneClient :: forall p a. (RpcProtocol p, HasProtocolImpl p) => ProtocolImpl p -> (Client p -> IO a) -> IO a +withStandaloneClient :: forall p a. (RpcProtocol p, HasProtocolImpl p) => ProtocolImpl p -> (Client p -> AsyncIO a) -> IO a withStandaloneClient impl runClientHook = withServer impl [] $ \server -> withLocalClient server runClientHook diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index 0553df791ee68f59d65ab39bce2893c4de29fd35..e25c7bef7c4cc78401adc6a5a97425cfe0e0f3f8 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -23,6 +23,7 @@ import Data.Maybe (isNothing) import GHC.Records.Compat (HasField) import Language.Haskell.TH hiding (interruptible) import Language.Haskell.TH.Syntax +import Quasar.Core import Quasar.Network.Multiplexer import Quasar.Network.Runtime import Quasar.Prelude @@ -119,7 +120,8 @@ makeProtocol api code = sequence [protocolDec, protocolInstanceDec, requestDec, serializableTypeDerivClauses :: [Q DerivClause] serializableTypeDerivClauses = [ - derivClause Nothing [[t|Eq|], [t|Show|], [t|Generic|], [t|Binary|]] + derivClause (Just StockStrategy) [[t|Eq|], [t|Show|], [t|Generic|]], + derivClause (Just AnyclassStrategy) [[t|Binary|]] ] makeClient :: Code -> Q [Dec] @@ -131,14 +133,6 @@ makeServer api@RpcApi{functions} code = sequence [protocolImplDec, logicInstance protocolImplDec :: Q Dec protocolImplDec = do dataD (pure []) (implTypeName api) [] Nothing [recC (implTypeName api) code.serverImplFields] [] - functionImplType :: RpcFunction -> Q Type - functionImplType fun = do - argumentTypes <- functionArgumentTypes fun - streamTypes <- serverStreamTypes - buildFunctionType (pure (argumentTypes <> streamTypes)) [t|IO $(buildTupleType (functionResultTypes fun))|] - where - serverStreamTypes :: Q [Type] - serverStreamTypes = sequence $ (\stream -> [t|Stream $(stream.tyDown) $(stream.tyUp)|]) <$> fun.streams logicInstanceDec :: Q Dec logicInstanceDec = instanceD (cxt []) [t|HasProtocolImpl $(protocolType api)|] [ @@ -245,7 +239,7 @@ generateFunction api fun = do clientStubDecs, serverImplFields = if isNothing fun.fixedHandler - then [ varDefaultBangType implFieldName implSig ] + then [varDefaultBangType implFieldName implSig] else [], requests = [request] } @@ -263,7 +257,6 @@ generateFunction api fun = do name = fun.name, -- TODO unpack? fields = [ Field { name = "packedResponse", ty = buildTupleType (sequence ((.ty) <$> fun.results)) } ] - --numCreatedChannels = undefined } implFieldName :: Name implFieldName = functionImplFieldName api fun @@ -277,7 +270,7 @@ generateFunction api fun = do serverStreamTypes = sequence $ (\stream -> [t|Stream $(stream.tyDown) $(stream.tyUp)|]) <$> fun.streams serverRequestHandlerE :: RequestHandlerContext -> Q Exp - serverRequestHandlerE ctx = applyChannels ctx.channelEs (applyArgs (implFieldE ctx.implRecordE)) + serverRequestHandlerE ctx = applyChannels (applyArgs (implFieldE ctx.implRecordE)) ctx.channelEs where implFieldE :: Q Exp -> Q Exp implFieldE implRecordE = case fun.fixedHandler of @@ -285,41 +278,34 @@ generateFunction api fun = do Just handler -> [|$(handler) :: $implSig|] applyArgs :: Q Exp -> Q Exp applyArgs implE = foldl appE implE ctx.argumentEs - applyChannels :: [Q Exp] -> Q Exp -> Q Exp - applyChannels [] implE = implE - applyChannels (channel0E:channelEs) implE = varE 'join `appE` foldl - (\x y -> [|$x <*> $y|]) - ([|$implE <$> $(createStream channel0E)|]) - (createStream <$> channelEs) + applyChannels :: Q Exp -> [Q Exp] -> Q Exp + applyChannels implE channelsEs = varE 'join `appE` applyM implE (createStream <$> channelsEs) where createStream :: Q Exp -> Q Exp createStream = (varE 'newStream `appE`) clientFunctionStub :: Q [Q Dec] clientFunctionStub = do + clientStubPrimeName <- newName fun.name clientVarName <- newName "client" argNames <- sequence (newName . (.name) <$> fun.arguments) - channelNames <- sequence (newName . (<> "Channel") . (.name) <$> fun.streams) - streamNames <- sequence (newName . (.name) <$> fun.streams) - makeClientFunction' clientVarName argNames channelNames streamNames + makeClientFunction' clientStubPrimeName clientVarName argNames where funName :: Name funName = mkName fun.name - makeClientFunction' :: Name -> [Name] -> [Name] -> [Name] -> Q [Q Dec] - makeClientFunction' clientVarName argNames channelNames streamNames = do + makeClientFunction' :: Name -> Name -> [Name] -> Q [Q Dec] + makeClientFunction' clientStubPrimeName clientVarName argNames = do funArgTypes <- functionArgumentTypes fun clientType <- [t|Client $(protocolType api)|] - resultType <- optionalResultType - streamTypes <- clientStreamTypes pure [ - sigD funName (buildFunctionType (pure ([clientType] <> funArgTypes)) [t|IO $(buildTupleType (pure (resultType <> streamTypes)))|]), - funD funName [clause ([varP clientVarName] <> varPats) body []] + sigD funName (makeStubSig (pure (clientType : funArgTypes))), + funD funName [clause ([varP clientVarName] <> varPats) body clientStubPrimeDecs] ] where + makeStubSig :: Q [Type] -> Q Type + makeStubSig arguments = buildFunctionType arguments [t|AsyncIO $(buildTupleType (liftA2 (<>) optionalResultType clientStreamTypes))|] optionalResultType :: Q [Type] - optionalResultType - | hasResult fun = (\x -> [x]) <$> buildTupleType (functionResultTypes fun) - | otherwise = pure [] + optionalResultType = sequence $ whenHasResult [t|Async $(buildTupleType (functionResultTypes fun))|] clientStreamTypes :: Q [Type] clientStreamTypes = sequence $ (\stream -> [t|Stream $(stream.tyUp) $(stream.tyDown)|]) <$> fun.streams clientE :: Q Exp @@ -328,69 +314,65 @@ generateFunction api fun = do varPats = varP <$> argNames body :: Q Body body - | hasResult fun = do - responseName <- newName "response" - normalB $ doE $ - [ - bindS [p|($(varP responseName), resources)|] (requestE requestDataE), - bindS [p|result|] (checkResult (varE responseName)) - ] <> - createStreams [|resources.createdChannels|] <> - [noBindS [|pure $(buildTuple (liftA2 (:) [|result|] streamsE))|]] - | otherwise = normalB $ doE $ - [bindS [p|resources|] (sendE requestDataE)] <> - createStreams [|resources.createdChannels|] <> - [noBindS [|pure $(buildTuple streamsE)|]] + | hasResult fun = normalB ([|$(requestE requestDataE) >>= \(result, resources) -> $(varE clientStubPrimeName) result resources.createdChannels|]) + | otherwise = normalB ([|$(sendE requestDataE) >>= \resources -> $(varE clientStubPrimeName) resources.createdChannels|]) + clientStubPrimeDecs :: [Q Dec] + clientStubPrimeDecs = [ + sigD clientStubPrimeName (makeStubSig (liftA2 (<>) optionalResultType (sequence [[t|[Channel]|]]))), + funD clientStubPrimeName (clientStubPrimeClauses request) + ] + clientStubPrimeClauses :: Request -> [Q Clause] + clientStubPrimeClauses req = [mainClause, invalidChannelCountClause] + where + mainClause :: Q Clause + mainClause = do + resultAsyncName <- newName "result" + + channelNames <- sequence $ newName . ("channel" <>) . show <$> [0 .. (req.numPipelinedChannels - 1)] + + clause + (whenHasResult (varP resultAsyncName) <> [listP (varP <$> channelNames)]) + (normalB (buildTupleM (sequence (whenHasResult [|pure $(varE resultAsyncName)|] <> ((\x -> [|newStream $(varE x)|]) <$> channelNames))))) + [] + + invalidChannelCountClause :: Q Clause + invalidChannelCountClause = do + channelsName <- newName "newChannels" + clause + (whenHasResult wildP <> [varP channelsName]) + (normalB [|$(varE 'multiplexerInvalidChannelCount) $(litE (integerL (toInteger req.numPipelinedChannels))) $(varE channelsName)|]) + [] + whenHasResult :: a -> [a] + whenHasResult x = if hasResult fun then [x] else [] requestDataE :: Q Exp requestDataE = applyVars (conE (requestFunctionConName api fun)) - createStreams :: Q Exp -> [Q Stmt] - createStreams channelsE = if length fun.streams > 0 then [assignChannels] <> go channelNames streamNames else [verifyNoChannels] - where - verifyNoChannels :: Q Stmt - verifyNoChannels = noBindS [|unless (null $(channelsE)) (fail "Invalid number of channel created")|] - assignChannels :: Q Stmt - assignChannels = - bindS - (tupP (varP <$> channelNames)) - $ caseE channelsE [ - match (listP (varP <$> channelNames)) (normalB [|pure $(tupE (varE <$> channelNames))|]) [], - match wildP (normalB [|fail "Invalid number of channel created"|]) [] - ] - go :: [Name] -> [Name] -> [Q Stmt] - go [] [] = [] - go (cn:cns) (sn:sns) = createStream cn sn : go cns sns - go _ _ = fail "Logic error: lists have different lengths" - createStream :: Name -> Name -> Q Stmt - createStream channelName streamName = bindS (varP streamName) [|newStream $(varE channelName)|] - streamsE :: Q [Exp] - streamsE = mapM varE streamNames messageConfigurationE :: Q Exp messageConfigurationE = [|defaultMessageConfiguration{createChannels = $(litE $ integerL $ toInteger $ length fun.streams)}|] sendE :: Q Exp -> Q Exp sendE msgExp = [|$typedSend $clientE $messageConfigurationE $msgExp|] requestE :: Q Exp -> Q Exp - requestE msgExp = [|$typedRequest $clientE $messageConfigurationE $msgExp|] + requestE msgExp = [|$typedRequest $clientE $checkResult $messageConfigurationE $msgExp|] applyVars :: Q Exp -> Q Exp applyVars = go argNames where go :: [Name] -> Q Exp -> Q Exp go [] ex = ex go (n:ns) ex = go ns (appE ex (varE n)) - -- check if the response to a request matches the expected result constructor - checkResult :: Q Exp -> Q Exp - checkResult x = caseE x [valid, invalid] + -- check if the response to a request matches the expected response constructor + checkResult :: Q Exp + checkResult = lamCaseE [valid, invalid] where valid :: Q Match valid = do result <- newName "result" match (conP (responseFunctionCtorName api fun) [varP result]) (normalB [|pure $(varE result)|]) [] invalid :: Q Match - invalid = match wildP (normalB [|$(varE 'clientReportProtocolError) $clientE "TODO"|]) [] + invalid = match wildP (normalB [|Nothing|]) [] typedSend :: Q Exp - typedSend = appTypeE [|clientSend|] (protocolType api) + typedSend = appTypeE (varE 'clientSend) (protocolType api) typedRequest :: Q Exp - typedRequest = appTypeE (varE 'clientRequestBlocking) (protocolType api) + typedRequest = appTypeE (varE 'clientRequest) (protocolType api) functionArgumentTypes :: RpcFunction -> Q [Type] functionArgumentTypes fun = sequence $ (.ty) <$> fun.arguments @@ -459,6 +441,7 @@ buildTupleType fields = buildTupleType' =<< fields go t [] = t go t (f:fs) = go (AppT t f) fs +-- | [a, b, c] -> (a, b, c) buildTuple :: Q [Exp] -> Q Exp buildTuple fields = buildTuple' =<< fields where @@ -467,6 +450,20 @@ buildTuple fields = buildTuple' =<< fields buildTuple' [single] = pure single buildTuple' fs = pure $ TupE (Just <$> fs) +-- | [m a, m b, m c] -> m (a, b, c) +buildTupleM :: Q [Exp] -> Q Exp +buildTupleM fields = buildTuple' =<< fields + where + buildTuple' :: [Exp] -> Q Exp + buildTuple' [] = [|pure ()|] + buildTuple' [single] = pure single + buildTuple' fs = pure (TupE (const Nothing <$> fs)) `applyM` (pure <$> fs) + +-- | (a -> b -> c -> d) -> [m a, m b, m c] -> m d +applyM :: Q Exp -> [Q Exp] -> Q Exp +applyM con [] = [|pure $con|] +applyM con (monadicE:monadicEs) = foldl (\x y -> [|$x <*> $y|]) [|$con <$> $monadicE|] monadicEs + buildFunctionType :: Q [Type] -> Q Type -> Q Type buildFunctionType argTypes returnType = go =<< argTypes where @@ -480,8 +477,6 @@ defaultBangType = bangType (bang noSourceUnpackedness noSourceStrictness) varDefaultBangType :: Name -> Q Type -> Q VarBangType varDefaultBangType name qType = varBangType name $ bangType (bang noSourceUnpackedness noSourceStrictness) qType -fmapE :: Q Exp -> Q Exp -> Q Exp -fmapE f e = [|$(f) <$> $(e)|] -- * Error reporting @@ -490,3 +485,9 @@ reportInvalidChannelCount expectedCount newChannels onChannel = channelReportPro where msg = mconcat parts parts = ["Received ", show (length newChannels), " new channels, but expected ", show expectedCount] + +multiplexerInvalidChannelCount :: MonadIO m => Int -> [Channel] -> m a +multiplexerInvalidChannelCount expectedCount newChannels = liftIO $ fail msg + where + msg = mconcat parts + parts = ["Internal error: Multiplexer created ", show (length newChannels), " new channels, but expected ", show expectedCount] diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs index 93062f966e8fd2d32286b448f8cf36a61f5d6b1a..abbeaaced33b7490bba661d39ae2d479cfdaf1c9 100644 --- a/test/Quasar/NetworkSpec.hs +++ b/test/Quasar/NetworkSpec.hs @@ -13,6 +13,7 @@ module Quasar.NetworkSpec where import Control.Concurrent.MVar import Control.Monad.IO.Class (liftIO) import Prelude +import Quasar.Core import Quasar.Network import Quasar.Network.Runtime (withStandaloneClient) import Quasar.Network.TH (makeRpc) @@ -20,6 +21,10 @@ import Test.Hspec import Test.QuickCheck import Test.QuickCheck.Monadic +shouldReturnAsync :: (HasCallStack, Show a, Eq a) => AsyncIO a -> a -> AsyncIO () +action `shouldReturnAsync` expected = action >>= liftIO . (`shouldBe` expected) + + $(makeRpc $ rpcApi "Example" [ rpcFunction "fixedHandler42" $ do addArgument "arg" [t|Int|] @@ -79,10 +84,10 @@ spec = parallel $ do describe "Example" $ do it "works" $ do withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do - fixedHandler42 client 5 `shouldReturn` False - fixedHandler42 client 42 `shouldReturn` True - fixedHandlerInc client 41 `shouldReturn` 42 - multiArgs client 10 3 False `shouldReturn` (13, True) + awaitResult (fixedHandler42 client 5) `shouldReturnAsync` False + awaitResult (fixedHandler42 client 42) `shouldReturnAsync` True + awaitResult (fixedHandlerInc client 41) `shouldReturnAsync` 42 + awaitResult (multiArgs client 10 3 False) `shouldReturnAsync` (13, True) noResponse client 1337 noNothing client @@ -98,10 +103,10 @@ spec = parallel $ do streamClose stream2 aroundAll (\x -> withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do - resultMVar <- newEmptyMVar + resultMVar <- liftIO newEmptyMVar stream <- createMultiplyStream client streamSetHandler stream $ putMVar resultMVar - x (resultMVar, stream) + liftIO $ x (resultMVar, stream) ) $ it "can send data over the stream" $ \(resultMVar, stream) -> property $ \(x, y) -> monadicIO $ do liftIO $ streamSend stream (x, y) liftIO $ takeMVar resultMVar `shouldReturn` x * y