Skip to content
Snippets Groups Projects
Commit 1c2ec4f4 authored by Jens Nolte's avatar Jens Nolte
Browse files

Implement basic multiplexer message send and receive


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 195d90a6
No related branches found
No related tags found
No related merge requests found
Pipeline #2541 failed
......@@ -96,6 +96,7 @@ library
async,
binary,
bytestring,
deepseq,
exceptions,
mtl,
network,
......@@ -124,7 +125,6 @@ test-suite quasar-network-test
hspec-core,
hspec-expectations-lifted,
mtl,
network,
quasar,
quasar-network,
stm,
......
This diff is collapsed.
......@@ -40,6 +40,7 @@ module Quasar.Network.Runtime (
) where
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Monad.Catch
import Data.Binary (Binary, encode, decodeOrFail)
import Data.ByteString.Lazy qualified as BSL
......@@ -70,36 +71,26 @@ class RpcProtocol p => HasProtocolImpl p where
data Client p = Client {
channel :: Channel,
stateMVar :: MVar (ClientState p)
callbacksVar :: TVar (HM.HashMap MessageId (ProtocolResponse p -> IO ()))
}
instance IsDisposable (Client p) where
toDisposable client = toDisposable client.channel
newtype ClientState p = ClientState {
callbacks :: HM.HashMap MessageId (ProtocolResponse p -> IO ())
}
emptyClientState :: ClientState p
emptyClientState = ClientState {
callbacks = HM.empty
}
clientSend :: forall p m. (MonadIO m, RpcProtocol p) => Client p -> MessageConfiguration -> ProtocolRequest p -> m SentMessageResources
clientSend client config req = liftIO $ channelSend_ client.channel config (encode req)
clientRequest :: forall p m a. (MonadIO m, RpcProtocol p) => Client p -> (ProtocolResponse p -> Maybe a) -> MessageConfiguration -> ProtocolRequest p -> m (Awaitable a, SentMessageResources)
clientRequest client checkResponse config req = do
resultAsync <- newAsyncVar
sentMessageResources <- liftIO $ channelSend client.channel config (encode req) $ \msgId ->
modifyMVar_ client.stateMVar $
\state -> pure state{callbacks = HM.insert msgId (requestCompletedCallback resultAsync msgId) state.callbacks}
sentMessageResources <- liftIO $ channelSend client.channel config (encode req) \msgId ->
modifyTVar client.callbacksVar $ HM.insert msgId (requestCompletedCallback resultAsync msgId)
pure (toAwaitable resultAsync, sentMessageResources)
where
requestCompletedCallback :: AsyncVar a -> MessageId -> ProtocolResponse p -> IO ()
requestCompletedCallback resultAsync msgId response = do
-- Remove callback
modifyMVar_ client.stateMVar $ \state -> pure state{callbacks = HM.delete msgId state.callbacks}
atomically $ modifyTVar client.callbacksVar $ HM.delete msgId
case checkResponse response of
Nothing -> clientReportProtocolError client "Invalid response"
......@@ -116,12 +107,11 @@ clientHandleChannelMessage client resources msg = liftIO do
clientHandleResponse :: ProtocolResponseWrapper p -> IO ()
clientHandleResponse (requestId, resp) = do
unless (null resources.createdChannels) (channelReportProtocolError client.channel "Received unexpected new channel during a rpc response")
callback <- modifyMVar client.stateMVar $ \state -> do
let (callbacks, mCallback) = lookupDelete requestId state.callbacks
join $ atomically $ stateTVar client.callbacksVar $ \oldCallbacks -> do
let (callbacks, mCallback) = lookupDelete requestId oldCallbacks
case mCallback of
Just callback -> pure (state{callbacks}, callback)
Nothing -> channelReportProtocolError client.channel ("Received response with invalid request id " <> show requestId)
callback resp
Just callback -> (callback resp, callbacks)
Nothing -> (channelReportProtocolError client.channel ("Received response with invalid request id " <> show requestId), callbacks)
clientReportProtocolError :: Client p -> String -> IO a
clientReportProtocolError client = channelReportProtocolError client.channel
......@@ -203,10 +193,10 @@ withClientBracket createClient = bracket createClient dispose
newChannelClient :: MonadIO m => RpcProtocol p => Channel -> m (Client p)
newChannelClient channel = do
stateMVar <- liftIO $ newMVar emptyClientState
callbacksVar <- liftIO $ newTVarIO mempty
let client = Client {
channel,
stateMVar
callbacksVar
}
channelSetBinaryHandler channel (clientHandleChannelMessage client)
pure client
......
......@@ -48,10 +48,10 @@ spec = describe "runMultiplexer" $ parallel $ do
do x
peekAwaitable var `shouldReturn` Just ()
it "can send and receive simple messages" $ do
fit "can send and receive simple messages" $ do
recvMVar <- newEmptyMVar
withEchoServer $ \channel -> do
channelSetSimpleBinaryHandler channel ((liftIO . putMVar recvMVar) :: BSL.ByteString -> ResourceManagerIO ())
channelSetSimpleHandler channel ((liftIO . putMVar recvMVar) :: BSL.ByteString -> ResourceManagerIO ())
channelSendSimple channel "foobar"
liftIO $ takeMVar recvMVar `shouldReturn` "foobar"
channelSendSimple channel "test"
......@@ -62,7 +62,7 @@ spec = describe "runMultiplexer" $ parallel $ do
it "can create sub-channels" $ do
recvMVar <- newEmptyMVar
withEchoServer $ \channel -> do
channelSetBinaryHandler channel ((\_ -> liftIO . putMVar recvMVar) :: ReceivedMessageResources -> BSL.ByteString -> ResourceManagerIO ())
channelSetHandler channel ((\_ -> liftIO . putMVar recvMVar) :: ReceivedMessageResources -> BSL.ByteString -> ResourceManagerIO ())
SentMessageResources{createdChannels=[_]} <- channelSend_ channel defaultMessageConfiguration{createChannels=1} "create a channel"
liftIO $ takeMVar recvMVar `shouldReturn` "create a channel"
SentMessageResources{createdChannels=[_, _, _]} <- channelSend_ channel defaultMessageConfiguration{createChannels=3} "create more channels"
......@@ -75,14 +75,14 @@ spec = describe "runMultiplexer" $ parallel $ do
c2RecvMVar <- newEmptyMVar
c3RecvMVar <- newEmptyMVar
withEchoServer $ \channel -> do
channelSetSimpleBinaryHandler channel $ (liftIO . putMVar recvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSetSimpleHandler channel $ (liftIO . putMVar recvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSendSimple channel "foobar"
liftIO $ takeMVar recvMVar `shouldReturn` "foobar"
SentMessageResources{createdChannels=[c1, c2]} <- channelSend_ channel defaultMessageConfiguration{createChannels=2} "create channels"
liftIO $ takeMVar recvMVar `shouldReturn` "create channels"
channelSetSimpleBinaryHandler c1 (liftIO . putMVar c1RecvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSetSimpleBinaryHandler c2 (liftIO . putMVar c2RecvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSetSimpleHandler c1 (liftIO . putMVar c1RecvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSetSimpleHandler c2 (liftIO . putMVar c2RecvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSendSimple c1 "test"
liftIO $ takeMVar c1RecvMVar `shouldReturn` "test"
......@@ -95,7 +95,7 @@ spec = describe "runMultiplexer" $ parallel $ do
SentMessageResources{createdChannels=[c3]} <- channelSend_ channel defaultMessageConfiguration{createChannels=1} "create another channel"
liftIO $ takeMVar recvMVar `shouldReturn` "create another channel"
channelSetSimpleBinaryHandler c3 (liftIO . putMVar c3RecvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSetSimpleHandler c3 (liftIO . putMVar c3RecvMVar :: BSL.ByteString -> ResourceManagerIO ())
channelSendSimple c3 "test5"
liftIO $ takeMVar c3RecvMVar `shouldReturn` "test5"
......@@ -118,10 +118,10 @@ spec = describe "runMultiplexer" $ parallel $ do
runMultiplexer MultiplexerSideA (liftIO . testAction) connection
withEchoServer :: (forall m. MonadResourceManager m => Channel -> m a) -> IO a
withEchoServer :: (Channel -> ResourceManagerIO ()) -> IO ()
withEchoServer fn = rm $ bracket setup closePair (\(channel, _) -> fn channel)
where
setup :: MonadResourceManager m => m (Channel, Channel)
setup :: ResourceManagerIO (Channel, Channel)
setup = do
(mainSocket, echoSocket) <- newConnectionPair
mainChannel <- newMultiplexer MultiplexerSideA mainSocket
......@@ -131,7 +131,7 @@ withEchoServer fn = rm $ bracket setup closePair (\(channel, _) -> fn channel)
closePair :: MonadResourceManager m => (Channel, Channel) -> m ()
closePair (x, y) = dispose x >> dispose y
configureEchoHandler :: MonadIO m => Channel -> m ()
configureEchoHandler channel = channelSetBinaryHandler channel (echoHandler channel)
configureEchoHandler channel = channelSetHandler channel (echoHandler channel)
echoHandler :: Channel -> ReceivedMessageResources -> BSL.ByteString -> ResourceManagerIO ()
echoHandler channel resources msg = do
mapM_ configureEchoHandler resources.createdChannels
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment