From 23e49c8eb575381c9050918062bdddf1590ba191 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Sun, 2 Jan 2022 01:21:31 +0100 Subject: [PATCH] Update quasar (splits MonadResourceManager and MonadIO constraints) --- flake.lock | 6 +-- src/Quasar/Network/Multiplexer.hs | 18 +++---- src/Quasar/Network/Runtime.hs | 69 +++++++++++++----------- src/Quasar/Network/Runtime/Observable.hs | 2 +- src/Quasar/Network/TH.hs | 2 +- test/Quasar/Network/MultiplexerSpec.hs | 9 ++-- test/Quasar/NetworkSpec.hs | 5 +- 7 files changed, 59 insertions(+), 52 deletions(-) diff --git a/flake.lock b/flake.lock index 4940d03..c13a18a 100644 --- a/flake.lock +++ b/flake.lock @@ -22,11 +22,11 @@ }, "locked": { "host": "git.c3pb.de", - "lastModified": 1639255442, - "narHash": "sha256-zlkViCgmxGlouprrc6RNquXYI3IOCvAUn36IdIAfKek=", + "lastModified": 1641081421, + "narHash": "sha256-LQiTNImd3/9Vbwidn7wmuwR1YI5K4ptX6GNVapPXSQg=", "owner": "jens", "repo": "quasar", - "rev": "b3a13dd9cf2841811f89812d27a5639f8e60821b", + "rev": "3094546aa29618a5d278b10eea354b9d6f383122", "type": "gitlab" }, "original": { diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs index e839103..d346f4f 100644 --- a/src/Quasar/Network/Multiplexer.hs +++ b/src/Quasar/Network/Multiplexer.hs @@ -172,7 +172,7 @@ instance IsResourceManager Channel where newRootChannel :: Multiplexer -> ResourceManagerIO Channel newRootChannel multiplexer = do resourceManager <- askResourceManager - liftIO do + channel <- liftIO do channelHandler <- newTVarIO Nothing children <- newTVarIO mempty nextSendMessageId <- newTVarIO 0 @@ -180,7 +180,7 @@ newRootChannel multiplexer = do receivedCloseMessage <- newTVarIO False sentCloseMessage <- newTVarIO False - let channel = Channel { + pure Channel { multiplexer, resourceManager, channelId = 0, @@ -193,9 +193,9 @@ newRootChannel multiplexer = do sentCloseMessage } - attachDisposeAction_ resourceManager $ atomically $ sendChannelCloseMessage channel + registerDisposeAction $ atomically $ sendChannelCloseMessage channel - pure channel + pure channel newChannelSTM :: Channel -> ChannelId -> STM Channel newChannelSTM parent@Channel{multiplexer, resourceManager=parentResourceManager} channelId = do @@ -228,7 +228,7 @@ newChannelSTM parent@Channel{multiplexer, resourceManager=parentResourceManager} modifyTVar parent.children $ HM.insert channelId channel disposable <- newSTMDisposable $ sendChannelCloseMessage channel - attachDisposableSTM resourceManager disposable + attachDisposable resourceManager disposable modifyTVar multiplexer.channelsVar $ HM.insert channelId channel @@ -314,7 +314,7 @@ data ReceivedMessageResources = ReceivedMessageResources { -- | Starts a new multiplexer on the provided connection and blocks until it is closed. -- The channel is provided to a setup action and can be closed by calling `dispose`; otherwise the multiplexer will run until the underlying connection is closed. -runMultiplexer :: MonadResourceManager m => MultiplexerSide -> (Channel -> ResourceManagerIO ()) -> Connection -> m () +runMultiplexer :: (MonadResourceManager m, MonadIO m) => MultiplexerSide -> (Channel -> ResourceManagerIO ()) -> Connection -> m () runMultiplexer side channelSetupHook connection = liftResourceManagerIO $ mask_ do (rootChannel, result) <- newMultiplexerInternal side connection onResourceManager rootChannel $ channelSetupHook rootChannel @@ -322,7 +322,7 @@ runMultiplexer side channelSetupHook connection = liftResourceManagerIO $ mask_ mapM_ throwM mException -- | Starts a new multiplexer on an existing connection (e.g. on a connected TCP socket). -newMultiplexer :: MonadResourceManager m => MultiplexerSide -> Connection -> m Channel +newMultiplexer :: (MonadResourceManager m, MonadIO m) => MultiplexerSide -> Connection -> m Channel newMultiplexer side connection = liftResourceManagerIO $ mask_ do resourceManager <- askResourceManager (rootChannel, result) <- newMultiplexerInternal side connection @@ -382,7 +382,7 @@ newMultiplexerInternal side connection = disposeOnError do dispose receiveThread connection.close - putAsyncVar_ multiplexerResult =<< peekAwaitable multiplexerException + putAsyncVar_ multiplexerResult =<< peekAwaitable (await multiplexerException) pure Multiplexer { side, @@ -451,7 +451,7 @@ sendThread multiplexer sendFn = do closeChannelQueue <- swapTVar multiplexer.closeChannelOutbox [] case (mMessage, closeChannelQueue) of -- Exit when the receive thread has stopped and there is no error and no message left to send - (Nothing, []) -> pure () <$ await multiplexer.receiveThreadCompleted + (Nothing, []) -> pure () <$ awaitSTM multiplexer.receiveThreadCompleted _ -> pure do msg <- execWriterT do formatChannelMessage mMessage diff --git a/src/Quasar/Network/Runtime.hs b/src/Quasar/Network/Runtime.hs index 7bacefa..68a7783 100644 --- a/src/Quasar/Network/Runtime.hs +++ b/src/Quasar/Network/Runtime.hs @@ -152,35 +152,36 @@ streamClose = dispose -- ** Running client and server -withClientTCP :: (RpcProtocol p, MonadResourceManager m) => Socket.HostName -> Socket.ServiceName -> (Client p -> m a) -> m a +withClientTCP :: (RpcProtocol p, MonadResourceManager m, MonadIO m, MonadMask m) => Socket.HostName -> Socket.ServiceName -> (Client p -> m a) -> m a withClientTCP host port = withClientBracket (newClientTCP host port) -newClientTCP :: (RpcProtocol p, MonadResourceManager m) => Socket.HostName -> Socket.ServiceName -> m (Client p) +newClientTCP :: (RpcProtocol p, MonadResourceManager m, MonadIO m) => Socket.HostName -> Socket.ServiceName -> m (Client p) newClientTCP host port = newClient =<< connectTCP host port -withClientUnix :: (RpcProtocol p, MonadResourceManager m) => FilePath -> (Client p -> m a) -> m a +withClientUnix :: (RpcProtocol p, MonadResourceManager m, MonadIO m, MonadMask m) => FilePath -> (Client p -> m a) -> m a withClientUnix socketPath = withClientBracket (newClientUnix socketPath) -newClientUnix :: (MonadResourceManager m, RpcProtocol p) => FilePath -> m (Client p) +newClientUnix :: (RpcProtocol p, MonadResourceManager m, MonadIO m) => FilePath -> m (Client p) newClientUnix socketPath = - bracketOnError - do liftIO $ Socket.socket Socket.AF_UNIX Socket.Stream Socket.defaultProtocol - do liftIO . Socket.close - \sock -> do - liftIO do - Socket.withFdSocket sock Socket.setCloseOnExecIfNeeded - Socket.connect sock $ Socket.SockAddrUnix socketPath - newClient $ socketConnection socketPath sock - - -withClient :: forall p a m b. (RpcProtocol p, MonadResourceManager m) => Connection -> (Client p -> m b) -> m b + liftResourceManagerIO do + bracketOnError + do liftIO $ Socket.socket Socket.AF_UNIX Socket.Stream Socket.defaultProtocol + do liftIO . Socket.close + \sock -> do + liftIO do + Socket.withFdSocket sock Socket.setCloseOnExecIfNeeded + Socket.connect sock $ Socket.SockAddrUnix socketPath + newClient $ socketConnection socketPath sock + + +withClient :: forall p a m b. (RpcProtocol p, MonadResourceManager m, MonadIO m, MonadMask m) => Connection -> (Client p -> m b) -> m b withClient connection = withClientBracket (newClient connection) -newClient :: forall p a m. (RpcProtocol p, MonadResourceManager m) => Connection -> m (Client p) +newClient :: forall p a m. (RpcProtocol p, MonadResourceManager m, MonadIO m) => Connection -> m (Client p) newClient connection = newChannelClient =<< newMultiplexer MultiplexerSideA connection -withClientBracket :: (MonadResourceManager m) => m (Client p) -> (Client p -> m a) -> m a +withClientBracket :: (MonadResourceManager m, MonadIO m, MonadMask m) => m (Client p) -> (Client p -> m a) -> m a withClientBracket createClient = bracket createClient dispose @@ -211,7 +212,7 @@ instance IsDisposable (Server p) where toDisposable = toDisposable . toResourceManager -newServer :: forall p m. (HasProtocolImpl p, MonadResourceManager m) => ProtocolImpl p -> [Listener] -> m (Server p) +newServer :: forall p m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m) => ProtocolImpl p -> [Listener] -> m (Server p) newServer protocolImpl listeners = do resourceManager <- newResourceManager let server = Server { resourceManager, protocolImpl } @@ -224,7 +225,7 @@ addListener server listener = captureDisposable_ $ async_ $ runListener listener where - runListener :: MonadResourceManager f => Listener -> f a + runListener :: Listener -> ResourceManagerIO a runListener (TcpPort mhost port) = runTCPListener server mhost port runListener (UnixSocket path) = runUnixSocketListener server path runListener (ListenSocket socket) = runListenerOnBoundSocket server socket @@ -232,13 +233,13 @@ addListener server listener = addListener_ :: (HasProtocolImpl p, MonadIO m) => Server p -> Listener -> m () addListener_ server listener = void $ addListener server listener -runServer :: forall p m. (HasProtocolImpl p, MonadResourceManager m) => ProtocolImpl p -> [Listener] -> m () -runServer _ [] = throwM $ userError "Tried to start a server without any listeners" +runServer :: forall p m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m) => ProtocolImpl p -> [Listener] -> m () +runServer _ [] = liftIO $ throwM $ userError "Tried to start a server without any listeners" runServer protocolImpl listener = do server <- newServer @p protocolImpl listener - await $ isDisposed server + liftIO $ await $ isDisposed server -listenTCP :: forall p m. (HasProtocolImpl p, MonadResourceManager m) => ProtocolImpl p -> Maybe Socket.HostName -> Socket.ServiceName -> m () +listenTCP :: forall p m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m) => ProtocolImpl p -> Maybe Socket.HostName -> Socket.ServiceName -> m () listenTCP impl mhost port = runServer @p impl [TcpPort mhost port] runTCPListener :: forall p a m. (HasProtocolImpl p, MonadIO m, MonadMask m) => Server p -> Maybe Socket.HostName -> Socket.ServiceName -> m a @@ -257,7 +258,7 @@ runTCPListener server mhost port = do Socket.bind sock (Socket.addrAddress addr) pure sock -listenUnix :: forall p m. (HasProtocolImpl p, MonadResourceManager m) => ProtocolImpl p -> FilePath -> m () +listenUnix :: forall p m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m) => ProtocolImpl p -> FilePath -> m () listenUnix impl path = runServer @p impl [UnixSocket path] runUnixSocketListener :: forall p a m. (HasProtocolImpl p, MonadIO m, MonadMask m) => Server p -> FilePath -> m a @@ -279,7 +280,7 @@ runUnixSocketListener server socketPath = do pure sock -- | Listen and accept connections on an already bound socket. -listenOnBoundSocket :: forall p m. (HasProtocolImpl p, MonadResourceManager m) => ProtocolImpl p -> Socket.Socket -> m () +listenOnBoundSocket :: forall p m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m) => ProtocolImpl p -> Socket.Socket -> m () listenOnBoundSocket protocolImpl socket = runServer @p protocolImpl [ListenSocket socket] runListenerOnBoundSocket :: forall p a m. (HasProtocolImpl p, MonadIO m, MonadMask m) => Server p -> Socket.Socket -> m a @@ -294,12 +295,14 @@ connectToServer server connection = -- Attach to server resource manager: When the server is closed, all listeners should be closed. onResourceManager server do asyncWithHandler_ (traceIO . formatException) do + --traceIO $ mconcat ["Client connected (", connection.description, ")"] -- This needs a resource manager which catches (and then logs) exceptions. Since that doesn't exist right now, -- a new resource manager root is used instead. withRootResourceManager do runMultiplexer MultiplexerSideB registerChannelServerHandler $ connection + --traceIO $ mconcat ["Client connection closed (", connection.description, ")"] where registerChannelServerHandler :: Channel -> ResourceManagerIO () registerChannelServerHandler channel = liftIO do @@ -314,21 +317,23 @@ connectToServer server connection = mconcat ["Client exception (", connection.description, "): ", (displayException ex)] -withLocalClient :: forall p a m. (HasProtocolImpl p, MonadResourceManager m) => Server p -> (Client p -> m a) -> m a +withLocalClient :: forall p a m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m, MonadMask m) => Server p -> (Client p -> m a) -> m a withLocalClient server action = withScopedResourceManager do client <- newLocalClient server action client -newLocalClient :: forall p m. (HasProtocolImpl p, MonadResourceManager m) => Server p -> m (Client p) -newLocalClient server = mask_ do - (clientSocket, serverSocket) <- newConnectionPair - connectToServer server serverSocket - newClient @p clientSocket +newLocalClient :: forall p m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m) => Server p -> m (Client p) +newLocalClient server = + liftResourceManagerIO do + mask_ do + (clientSocket, serverSocket) <- newConnectionPair + connectToServer server serverSocket + newClient @p clientSocket -- ** Test implementation -withStandaloneClient :: forall p a m. (HasProtocolImpl p, MonadResourceManager m) => ProtocolImpl p -> (Client p -> m a) -> m a +withStandaloneClient :: forall p a m. (HasProtocolImpl p, MonadResourceManager m, MonadIO m, MonadMask m) => ProtocolImpl p -> (Client p -> m a) -> m a withStandaloneClient impl runClientHook = do server <- newServer impl [] withLocalClient server runClientHook diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index 887c228..e972970 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -55,7 +55,7 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv retrieveFn :: ResourceManagerIO (Awaitable v) retrieveFn = startRetrieveRequest -observeToStream :: (Binary v, MonadResourceManager m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () +observeToStream :: (Binary v, MonadResourceManager m, MonadIO m, MonadMask m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () observeToStream observable stream = do localResourceManager stream do observe observable \msg -> do diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index 27a6898..6398e13 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -29,7 +29,7 @@ import Quasar.Network.Multiplexer import Quasar.Network.Runtime import Quasar.Network.Runtime.Observable import Quasar.Observable -import Quasar.Prelude +import Quasar.Prelude hiding (Type) import Quasar.ResourceManager data RpcApi = RpcApi { diff --git a/test/Quasar/Network/MultiplexerSpec.hs b/test/Quasar/Network/MultiplexerSpec.hs index cf0970f..cf38dd3 100644 --- a/test/Quasar/Network/MultiplexerSpec.hs +++ b/test/Quasar/Network/MultiplexerSpec.hs @@ -1,6 +1,7 @@ module Quasar.Network.MultiplexerSpec (spec) where import Control.Concurrent.Async (concurrently_) +import Control.Concurrent.STM import Control.Concurrent.MVar import Control.Monad.Catch import Control.Monad.Reader (ReaderT) @@ -19,7 +20,7 @@ import Test.Hspec qualified as Hspec rm :: ResourceManagerIO a -> IO a rm = withRootResourceManager -shouldThrow :: (HasCallStack, Exception e, MonadResourceManager m) => (ReaderT ResourceManager IO a) -> Hspec.Selector e -> m () +shouldThrow :: (HasCallStack, Exception e, MonadResourceManager m, MonadIO m) => (ReaderT ResourceManager IO a) -> Hspec.Selector e -> m () shouldThrow action expected = do resourceManager <- askResourceManager liftIO $ (onResourceManager resourceManager action) `Hspec.shouldThrow` expected @@ -46,10 +47,10 @@ spec = parallel $ describe "runMultiplexer" $ do do MultiplexerSideA do \channel -> do - attachDisposeAction_ channel.resourceManager (putAsyncVar_ var ()) + liftIO $ atomically $ attachDisposeAction_ channel.resourceManager (putAsyncVar_ var ()) dispose channel do x - peekAwaitable var `shouldReturn` Just () + peekAwaitable (await var) `shouldReturn` Just () it "can send and receive simple messages" $ do recvMVar <- newEmptyMVar @@ -132,7 +133,7 @@ withEchoServer fn = rm $ bracket setup closePair (\(channel, _) -> fn channel) echoChannel <- newMultiplexer MultiplexerSideB echoSocket configureEchoHandler echoChannel pure (mainChannel, echoChannel) - closePair :: MonadResourceManager m => (Channel, Channel) -> m () + closePair :: (Channel, Channel) -> ResourceManagerIO () closePair (x, y) = dispose x >> dispose y configureEchoHandler :: MonadIO m => Channel -> m () configureEchoHandler channel = channelSetHandler channel (echoHandler channel) diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs index d89ea27..f11a05e 100644 --- a/test/Quasar/NetworkSpec.hs +++ b/test/Quasar/NetworkSpec.hs @@ -29,10 +29,11 @@ import Test.Hspec qualified as Hspec import Test.QuickCheck import Test.QuickCheck.Monadic -rm :: (forall m. MonadResourceManager m => m a) -> IO a +-- Type is pinned to IO, otherwise hspec spec type cannot be inferred +rm :: ResourceManagerIO a -> IO a rm = withRootResourceManager -shouldThrow :: (HasCallStack, Exception e, MonadResourceManager m) => (ReaderT ResourceManager IO a) -> Hspec.Selector e -> m () +shouldThrow :: (HasCallStack, Exception e, MonadResourceManager m, MonadIO m) => (ReaderT ResourceManager IO a) -> Hspec.Selector e -> m () shouldThrow action expected = do resourceManager <- askResourceManager liftIO $ (onResourceManager resourceManager action) `Hspec.shouldThrow` expected -- GitLab