Skip to content
Snippets Groups Projects
Commit 7d78eb96 authored by Jens Nolte's avatar Jens Nolte
Browse files

Implement Observable (WIP)

Recovered from a backup
parent b9962634
No related branches found
No related tags found
No related merge requests found
...@@ -21,11 +21,11 @@ ...@@ -21,11 +21,11 @@
}, },
"locked": { "locked": {
"host": "git.c3pb.de", "host": "git.c3pb.de",
"lastModified": 1626923341, "lastModified": 1628533241,
"narHash": "sha256-CWvh6F6d1kEN6IpMvDBxSBNl4oJP2FhRGU5uGLwZSBw=", "narHash": "sha256-nvAqgEzmdYhvwTb0y6Vico4EvyOT1ehgzZU7/LrnW2g=",
"owner": "jens", "owner": "jens",
"repo": "quasar", "repo": "quasar",
"rev": "458784d70f664f3af9b98655505ca93e72610376", "rev": "894908377f0ee5cf626a6bdd8c4fdd29411b8e80",
"type": "gitlab" "type": "gitlab"
}, },
"original": { "original": {
......
...@@ -85,6 +85,7 @@ library ...@@ -85,6 +85,7 @@ library
Quasar.Network.Connection Quasar.Network.Connection
Quasar.Network.Multiplexer Quasar.Network.Multiplexer
Quasar.Network.Runtime Quasar.Network.Runtime
Quasar.Network.Runtime.Observable
Quasar.Network.SocketLocation Quasar.Network.SocketLocation
Quasar.Network.TH Quasar.Network.TH
hs-source-dirs: hs-source-dirs:
......
...@@ -11,6 +11,7 @@ module Quasar.Network ( ...@@ -11,6 +11,7 @@ module Quasar.Network (
addResult, addResult,
addStream, addStream,
setFixedHandler, setFixedHandler,
rpcObservable,
-- * Runtime -- * Runtime
......
...@@ -47,7 +47,7 @@ import Data.Binary (Binary, encode, decodeOrFail) ...@@ -47,7 +47,7 @@ import Data.Binary (Binary, encode, decodeOrFail)
import qualified Data.ByteString.Lazy as BSL import qualified Data.ByteString.Lazy as BSL
import qualified Data.HashMap.Strict as HM import qualified Data.HashMap.Strict as HM
import qualified Network.Socket as Socket import qualified Network.Socket as Socket
import Quasar.Core import Quasar.Awaitable
import Quasar.Network.Connection import Quasar.Network.Connection
import Quasar.Network.Multiplexer import Quasar.Network.Multiplexer
import Quasar.Prelude import Quasar.Prelude
...@@ -82,13 +82,13 @@ emptyClientState = ClientState { ...@@ -82,13 +82,13 @@ emptyClientState = ClientState {
clientSend :: forall p m. (MonadIO m, RpcProtocol p) => Client p -> MessageConfiguration -> ProtocolRequest p -> m SentMessageResources clientSend :: forall p m. (MonadIO m, RpcProtocol p) => Client p -> MessageConfiguration -> ProtocolRequest p -> m SentMessageResources
clientSend client config req = liftIO $ channelSend_ client.channel config (encode req) clientSend client config req = liftIO $ channelSend_ client.channel config (encode req)
clientRequest :: forall p m a. (MonadIO m, RpcProtocol p) => Client p -> (ProtocolResponse p -> Maybe a) -> MessageConfiguration -> ProtocolRequest p -> m (Async a, SentMessageResources) clientRequest :: forall p m a. (MonadIO m, RpcProtocol p) => Client p -> (ProtocolResponse p -> Maybe a) -> MessageConfiguration -> ProtocolRequest p -> m (Awaitable a, SentMessageResources)
clientRequest client checkResponse config req = do clientRequest client checkResponse config req = do
resultAsync <- newAsyncVar resultAsync <- newAsyncVar
sentMessageResources <- liftIO $ channelSend client.channel config (encode req) $ \msgId -> sentMessageResources <- liftIO $ channelSend client.channel config (encode req) $ \msgId ->
modifyMVar_ client.stateMVar $ modifyMVar_ client.stateMVar $
\state -> pure state{callbacks = HM.insert msgId (requestCompletedCallback resultAsync msgId) state.callbacks} \state -> pure state{callbacks = HM.insert msgId (requestCompletedCallback resultAsync msgId) state.callbacks}
pure (toAsync resultAsync, sentMessageResources) pure (toAwaitable resultAsync, sentMessageResources)
where where
requestCompletedCallback :: AsyncVar a -> MessageId -> ProtocolResponse p -> IO () requestCompletedCallback :: AsyncVar a -> MessageId -> ProtocolResponse p -> IO ()
requestCompletedCallback resultAsync msgId response = do requestCompletedCallback resultAsync msgId response = do
...@@ -97,7 +97,7 @@ clientRequest client checkResponse config req = do ...@@ -97,7 +97,7 @@ clientRequest client checkResponse config req = do
case checkResponse response of case checkResponse response of
Nothing -> clientReportProtocolError client "Invalid response" Nothing -> clientReportProtocolError client "Invalid response"
Just result -> putAsyncVar resultAsync result Just result -> putAsyncVar_ resultAsync result
clientHandleChannelMessage :: forall p. (RpcProtocol p) => Client p -> ReceivedMessageResources -> BSL.ByteString -> IO () clientHandleChannelMessage :: forall p. (RpcProtocol p) => Client p -> ReceivedMessageResources -> BSL.ByteString -> IO ()
clientHandleChannelMessage client resources msg = case decodeOrFail msg of clientHandleChannelMessage client resources msg = case decodeOrFail msg of
...@@ -153,14 +153,14 @@ streamClose (Stream channel) = liftIO $ channelClose channel ...@@ -153,14 +153,14 @@ streamClose (Stream channel) = liftIO $ channelClose channel
-- ** Running client and server -- ** Running client and server
withClientTCP :: RpcProtocol p => Socket.HostName -> Socket.ServiceName -> (Client p -> AsyncIO a) -> IO a withClientTCP :: RpcProtocol p => Socket.HostName -> Socket.ServiceName -> (Client p -> IO a) -> IO a
withClientTCP host port = withClientBracket (newClientTCP host port) withClientTCP host port = withClientBracket (newClientTCP host port)
newClientTCP :: forall p. RpcProtocol p => Socket.HostName -> Socket.ServiceName -> IO (Client p) newClientTCP :: forall p. RpcProtocol p => Socket.HostName -> Socket.ServiceName -> IO (Client p)
newClientTCP host port = newClient =<< connectTCP host port newClientTCP host port = newClient =<< connectTCP host port
withClientUnix :: RpcProtocol p => FilePath -> (Client p -> AsyncIO a) -> IO a withClientUnix :: RpcProtocol p => FilePath -> (Client p -> IO a) -> IO a
withClientUnix socketPath = withClientBracket (newClientUnix socketPath) withClientUnix socketPath = withClientBracket (newClientUnix socketPath)
newClientUnix :: RpcProtocol p => FilePath -> IO (Client p) newClientUnix :: RpcProtocol p => FilePath -> IO (Client p)
...@@ -170,14 +170,14 @@ newClientUnix socketPath = bracketOnError (Socket.socket Socket.AF_UNIX Socket.S ...@@ -170,14 +170,14 @@ newClientUnix socketPath = bracketOnError (Socket.socket Socket.AF_UNIX Socket.S
newClient sock newClient sock
withClient :: forall p a b. (IsConnection a, RpcProtocol p) => a -> (Client p -> AsyncIO b) -> IO b withClient :: forall p a b. (IsConnection a, RpcProtocol p) => a -> (Client p -> IO b) -> IO b
withClient connection = withClientBracket (newClient connection) withClient connection = withClientBracket (newClient connection)
newClient :: forall p a. (IsConnection a, RpcProtocol p) => a -> IO (Client p) newClient :: forall p a. (IsConnection a, RpcProtocol p) => a -> IO (Client p)
newClient connection = newChannelClient =<< newMultiplexer MultiplexerSideA (toSocketConnection connection) newClient connection = newChannelClient =<< newMultiplexer MultiplexerSideA (toSocketConnection connection)
withClientBracket :: forall p a. (RpcProtocol p) => IO (Client p) -> (Client p -> AsyncIO a) -> IO a withClientBracket :: forall p a. (RpcProtocol p) => IO (Client p) -> (Client p -> IO a) -> IO a
withClientBracket createClient action = bracket createClient clientClose $ \client -> runAsyncIO (action client) withClientBracket createClient = bracket createClient clientClose
newChannelClient :: RpcProtocol p => Channel -> IO (Client p) newChannelClient :: RpcProtocol p => Channel -> IO (Client p)
...@@ -293,8 +293,8 @@ runServerHandler protocolImpl = runMultiplexer MultiplexerSideB registerChannelS ...@@ -293,8 +293,8 @@ runServerHandler protocolImpl = runMultiplexer MultiplexerSideB registerChannelS
registerChannelServerHandler channel = channelSetHandler channel (serverHandleChannelMessage @p protocolImpl channel) registerChannelServerHandler channel = channelSetHandler channel (serverHandleChannelMessage @p protocolImpl channel)
withLocalClient :: forall p a. (RpcProtocol p, HasProtocolImpl p) => Server p -> ((Client p) -> AsyncIO a) -> IO a withLocalClient :: forall p m a. (RpcProtocol p, HasProtocolImpl p) => Server p -> (Client p -> IO a) -> IO a
withLocalClient server action = bracket (newLocalClient server) clientClose $ \client -> runAsyncIO (action client) withLocalClient server = bracket (newLocalClient server) clientClose
newLocalClient :: forall p. (RpcProtocol p, HasProtocolImpl p) => Server p -> IO (Client p) newLocalClient :: forall p. (RpcProtocol p, HasProtocolImpl p) => Server p -> IO (Client p)
newLocalClient server = do newLocalClient server = do
...@@ -306,5 +306,5 @@ newLocalClient server = do ...@@ -306,5 +306,5 @@ newLocalClient server = do
-- ** Test implementation -- ** Test implementation
withStandaloneClient :: forall p a. (RpcProtocol p, HasProtocolImpl p) => ProtocolImpl p -> (Client p -> AsyncIO a) -> IO a withStandaloneClient :: forall p a. (RpcProtocol p, HasProtocolImpl p) => ProtocolImpl p -> (Client p -> IO a) -> IO a
withStandaloneClient impl runClientHook = withServer impl [] $ \server -> withLocalClient server runClientHook withStandaloneClient impl runClientHook = withServer impl [] $ \server -> withLocalClient server runClientHook
module Quasar.Network.Runtime.Observable () where
import Quasar.Network.Runtime
import Quasar.Core
import Quasar.Observable
import Quasar.Prelude
newNetworkObservable
:: ((ObservableMessage v -> IO ()) -> IO Disposable)
-> (forall m. HasResourceManager m => m (Task v))
-> IO (Observable v)
newNetworkObservable observeFn retrieveFn = pure $ fnObservable observeFn retrieveFn
This diff is collapsed.
...@@ -11,8 +11,9 @@ ...@@ -11,8 +11,9 @@
module Quasar.NetworkSpec where module Quasar.NetworkSpec where
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Monad.IO.Class (liftIO) import Control.Monad.IO.Class (MonadIO, liftIO)
import Prelude import Prelude
import Quasar.Awaitable
import Quasar.Core import Quasar.Core
import Quasar.Network import Quasar.Network
import Quasar.Network.Runtime (withStandaloneClient) import Quasar.Network.Runtime (withStandaloneClient)
...@@ -25,38 +26,43 @@ shouldReturnAsync :: (HasCallStack, Show a, Eq a) => AsyncIO a -> a -> AsyncIO ( ...@@ -25,38 +26,43 @@ shouldReturnAsync :: (HasCallStack, Show a, Eq a) => AsyncIO a -> a -> AsyncIO (
action `shouldReturnAsync` expected = action >>= liftIO . (`shouldBe` expected) action `shouldReturnAsync` expected = action >>= liftIO . (`shouldBe` expected)
$(makeRpc $ rpcApi "Example" [ $(makeRpc $ rpcApi "Example" $ do
rpcFunction "fixedHandler42" $ do rpcFunction "fixedHandler42" $ do
addArgument "arg" [t|Int|] addArgument "arg" [t|Int|]
addResult "result" [t|Bool|] addResult "result" [t|Bool|]
setFixedHandler [| pure . (== 42) |], setFixedHandler [| pure . (== 42) |]
rpcFunction "fixedHandlerInc" $ do rpcFunction "fixedHandlerInc" $ do
addArgument "arg" [t|Int|] addArgument "arg" [t|Int|]
addResult "result" [t|Int|] addResult "result" [t|Int|]
setFixedHandler [| pure . (+ 1) |], setFixedHandler [| pure . (+ 1) |]
rpcFunction "multiArgs" $ do rpcFunction "multiArgs" $ do
addArgument "one" [t|Int|] addArgument "one" [t|Int|]
addArgument "two" [t|Int|] addArgument "two" [t|Int|]
addArgument "three" [t|Bool|] addArgument "three" [t|Bool|]
addResult "result" [t|Int|] addResult "result" [t|Int|]
addResult "result2" [t|Bool|], addResult "result2" [t|Bool|]
rpcFunction "noArgs" $ do rpcFunction "noArgs" $ do
addResult "result" [t|Int|], addResult "result" [t|Int|]
rpcFunction "noResponse" $ do rpcFunction "noResponse" $ do
addArgument "arg" [t|Int|], addArgument "arg" [t|Int|]
rpcFunction "noNothing" $ pure () rpcFunction "noNothing" $ pure ()
] )
)
$(makeRpc $ rpcApi "StreamExample" [ $(makeRpc $ rpcApi "StreamExample" $ do
rpcFunction "createMultiplyStream" $ do rpcFunction "createMultiplyStream" $ do
addStream "stream" [t|(Int, Int)|] [t|Int|] addStream "stream" [t|(Int, Int)|] [t|Int|]
,
rpcFunction "createStreams" $ do rpcFunction "createStreams" $ do
addStream "stream1" [t|Bool|] [t|Bool|] addStream "stream1" [t|Bool|] [t|Bool|]
addStream "stream2" [t|Int|] [t|Int|] addStream "stream2" [t|Int|] [t|Int|]
]
) rpcObservable "intObservable" [t|Int|]
)
exampleProtocolImpl :: ExampleProtocolImpl exampleProtocolImpl :: ExampleProtocolImpl
exampleProtocolImpl = ExampleProtocolImpl { exampleProtocolImpl = ExampleProtocolImpl {
...@@ -72,9 +78,9 @@ streamExampleProtocolImpl = StreamExampleProtocolImpl { ...@@ -72,9 +78,9 @@ streamExampleProtocolImpl = StreamExampleProtocolImpl {
createStreamsImpl createStreamsImpl
} }
where where
createMultiplyStreamImpl :: Stream Int (Int, Int) -> IO () createMultiplyStreamImpl :: MonadIO m => Stream Int (Int, Int) -> m ()
createMultiplyStreamImpl stream = streamSetHandler stream $ \(x, y) -> streamSend stream (x * y) createMultiplyStreamImpl stream = streamSetHandler stream $ \(x, y) -> streamSend stream (x * y)
createStreamsImpl :: Stream Bool Bool -> Stream Int Int -> IO () createStreamsImpl :: MonadIO m => Stream Bool Bool -> Stream Int Int -> m ()
createStreamsImpl stream1 stream2 = do createStreamsImpl stream1 stream2 = do
streamSetHandler stream1 $ streamSend stream1 streamSetHandler stream1 $ streamSend stream1
streamSetHandler stream2 $ streamSend stream2 streamSetHandler stream2 $ streamSend stream2
...@@ -84,10 +90,10 @@ spec = parallel $ do ...@@ -84,10 +90,10 @@ spec = parallel $ do
describe "Example" $ do describe "Example" $ do
it "works" $ do it "works" $ do
withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do
awaitResult (fixedHandler42 client 5) `shouldReturnAsync` False (awaitIO =<< fixedHandler42 client 5) `shouldReturn` False
awaitResult (fixedHandler42 client 42) `shouldReturnAsync` True (awaitIO =<< fixedHandler42 client 42) `shouldReturn` True
awaitResult (fixedHandlerInc client 41) `shouldReturnAsync` 42 (awaitIO =<< fixedHandlerInc client 41) `shouldReturn` 42
awaitResult (multiArgs client 10 3 False) `shouldReturnAsync` (13, True) (awaitIO =<< multiArgs client 10 3 False) `shouldReturn` (13, True)
noResponse client 1337 noResponse client 1337
noNothing client noNothing client
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment