diff --git a/flake.lock b/flake.lock index 49b7e0a1dd69a6575e7cbd14adb14926c33dc199..8bd4d7ef45abb99af9919230ed892318b439e988 100644 --- a/flake.lock +++ b/flake.lock @@ -21,11 +21,11 @@ }, "locked": { "host": "git.c3pb.de", - "lastModified": 1630286998, - "narHash": "sha256-2rGt7QPAHXuHP8mn/vxzXJotrGcv93jyQTZTPrwZ1w8=", + "lastModified": 1630355486, + "narHash": "sha256-/VpwedqTMkDhwy8aFzWYMDxoMZA97LzasGCV5+v09Jc=", "owner": "jens", "repo": "quasar", - "rev": "499e025611d10540dbebf0e2732a0316a6e111bd", + "rev": "b6d957517d5f3796796f14f21af2a34023ac5002", "type": "gitlab" }, "original": { diff --git a/quasar-network.cabal b/quasar-network.cabal index baa3650ba27c454728029edd3539d14b35e8db5c..1e2d888437928795894488dd64c4f4074c567a94 100644 --- a/quasar-network.cabal +++ b/quasar-network.cabal @@ -63,6 +63,7 @@ common shared-properties async, binary, bytestring, + exceptions, hashable, mtl, network, diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs index 999963a720f8d0c402a4999ece98183585ab4a8a..6a45bbcf8028c6cb7386326d6a391ce0aa7647bf 100644 --- a/src/Quasar/Network/Multiplexer.hs +++ b/src/Quasar/Network/Multiplexer.hs @@ -25,7 +25,8 @@ module Quasar.Network.Multiplexer ( import Control.Concurrent.Async (AsyncCancelled(..), async, link, race_, wait, waitAnyCancel, withAsync, withAsyncWithUnmask) -import Control.Exception (Exception(..), Handler(..), MaskingState(Unmasked), SomeException(..), catch, catches, handle, interruptible, getMaskingState, mask_) +import Control.Exception (MaskingState(Unmasked), interruptible, getMaskingState) +import Control.Monad.Catch import Control.Monad.State (StateT, execStateT, runStateT, lift) import qualified Control.Monad.State as State import Control.Concurrent.MVar @@ -37,6 +38,8 @@ import qualified Data.ByteString.Lazy as BSL import qualified Data.HashMap.Strict as HM import Data.Tuple (swap) import Data.Word +import Quasar.Awaitable +import Quasar.Disposable import Quasar.Network.Connection import Quasar.Prelude import System.IO (hPutStrLn, stderr) @@ -119,6 +122,7 @@ instance HasResourceManager Channel where getResourceManager channel = channel.resourceManager data ChannelState = ChannelState { + resourceManager :: ResourceManager, connectionState :: ChannelConnectivity, children :: [Channel] } @@ -172,7 +176,7 @@ newMultiplexer side x = do -- | Starts a new multiplexer on the provided connection and blocks until it is closed. -- The channel is provided to a setup action and can be closed by calling 'closeChannel'; otherwise the multiplexer will run until the underlying connection is closed. runMultiplexer :: (IsConnection a) => MultiplexerSide -> (Channel -> IO ()) -> a -> IO () -runMultiplexer side channelSetupHook connection = do +runMultiplexer side channelSetupHook connection = withResourceManager \resourceManager -> do -- Running in masked state, this thread (running the receive-function) cannot be interrupted when closing the connection maskingState <- getMaskingState when (maskingState /= Unmasked) (fail "'runMultiplexer' cannot run in masked thread state.") @@ -193,7 +197,7 @@ runMultiplexer side channelSetupHook connection = do } run :: IO (MultiplexerException, Bool) run = do - rootChannel <- withMultiplexerState worker (newChannel worker 0 Connected) + rootChannel <- withMultiplexerState worker (newChannel resourceManager worker 0 Connected) channelSetupHook rootChannel withAsync (receiveThread worker) $ \receiveTask -> withAsync (throwIO =<< multiplexerWaitUntilClosed worker) $ \waitForCloseTask -> do @@ -424,21 +428,32 @@ multiplexerSwitchChannel worker channelId = do -- | Closes a channel and all it's children: After the function completes, the channel callback will no longer be called on received messages and sending messages on the channel will fail. -- Calling close on a closed channel is a noop. -channelClose :: Channel -> IO () -channelClose channel = do - -- Change channel state of all unclosed channels in the tree to closed - channelWasClosed <- channelClose' channel - - when channelWasClosed $ - -- Closing a channel on a Connection that is no longer connected should not throw an exception (channelClose is a resource management operation and is supposed to be idempotent) - handle (\(_ :: MultiplexerException) -> pure ()) $ do - -- Send close message - withMultiplexerState worker $ do - multiplexerSwitchChannel worker channel.channelId - multiplexerStateSend worker CloseChannel - - -- Terminate the worker when the root channel is closed - when (channel.channelId == 0) $ multiplexerClose_ ConnectionClosed worker +-- +-- Alias for `dispose`. +channelClose :: Channel -> IO (Awaitable ()) +channelClose = dispose + +instance IsDisposable Channel where + toDisposable channel = toDisposable channel.resourceManager + +disposeChannel :: Channel -> IO (Awaitable ()) +disposeChannel channel = do + -- Change channel state of all unclosed channels in the tree to closed + channelWasClosed <- channelClose' channel + + when channelWasClosed $ + -- Closing a channel on a Connection that is no longer connected should not throw an exception (channelClose is a resource management operation and is supposed to be idempotent) + handle (\(_ :: MultiplexerException) -> pure ()) $ do + -- Send close message + withMultiplexerState worker $ do + multiplexerSwitchChannel worker channel.channelId + multiplexerStateSend worker CloseChannel + + -- Terminate the worker when the root channel is closed + when (channel.channelId == 0) $ multiplexerClose_ ConnectionClosed worker + + -- Return completed awaitabe (so this can be inserted into a ResourceManager) + pure $ pure () where worker = channel.worker channelClose' :: Channel -> IO Bool @@ -529,7 +544,7 @@ newSubChannel :: MultiplexerWorker -> ChannelId -> (StateT ChannelState (StateT newSubChannel worker channelId = do parentChannelState <- State.get -- Holding the parents channelState while initializing the channel will ensure the ChannelConnectivity is inherited atomically - createdChannel <- lift $ newChannel worker channelId parentChannelState.connectionState + createdChannel <- lift $ newChannel parentChannelState.resourceManager worker channelId parentChannelState.connectionState let newParentState = parentChannelState{ children = createdChannel : parentChannelState.children @@ -537,9 +552,11 @@ newSubChannel worker channelId = do State.put newParentState pure createdChannel -newChannel :: MultiplexerWorker -> ChannelId -> ChannelConnectivity -> StateT MultiplexerWorkerState IO Channel -newChannel worker channelId connectionState = do +newChannel :: ResourceManager -> MultiplexerWorker -> ChannelId -> ChannelConnectivity -> StateT MultiplexerWorkerState IO Channel +newChannel parentResourceManager worker channelId connectionState = do + resourceManager <- newResourceManager parentResourceManager stateMVar <- liftIO $ newMVar ChannelState { + resourceManager, connectionState, children = [] } @@ -551,6 +568,7 @@ newChannel worker channelId connectionState = do } handlerAtVar <- liftIO newEmptyAtVar let channel = Channel { + resourceManager, worker, channelId, stateMVar, @@ -558,6 +576,7 @@ newChannel worker channelId connectionState = do receiveStateMVar, handlerAtVar } + attachDisposeAction resourceManager (disposeChannel channel) State.modify $ \multiplexerState -> multiplexerState{channels = HM.insert channelId channel multiplexerState.channels} pure channel diff --git a/src/Quasar/Network/Runtime.hs b/src/Quasar/Network/Runtime.hs index 9a57e1b003bdce9d6e09e9e69312fe1765e2ae0e..3aa9caec760329c0c130d0344a0413254b1493b1 100644 --- a/src/Quasar/Network/Runtime.hs +++ b/src/Quasar/Network/Runtime.hs @@ -117,7 +117,7 @@ clientHandleChannelMessage client resources msg = case decodeOrFail msg of Nothing -> channelReportProtocolError client.channel ("Received response with invalid request id " <> show requestId) callback resp -clientClose :: Client p -> IO () +clientClose :: Client p -> IO (Awaitable ()) clientClose client = channelClose client.channel clientReportProtocolError :: Client p -> String -> IO a @@ -156,7 +156,7 @@ streamSend (Stream channel) value = liftIO $ channelSendSimple channel (encode v streamSetHandler :: (Binary down, MonadIO m) => Stream up down -> (down -> IO ()) -> m () streamSetHandler (Stream channel) handler = liftIO $ channelSetSimpleHandler channel handler -streamClose :: MonadIO m => Stream up down -> m () +streamClose :: MonadIO m => Stream up down -> m (Awaitable ()) streamClose (Stream channel) = liftIO $ channelClose channel -- ** Running client and server diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index 18e48e5af654ea048d5269a8a5535c6d9fc29317..eb4e7ec781b3f88ebd9baa1e67cd54af66475543 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -44,7 +44,7 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv -- TODO send updates about the connection status stream <- startObserveRequest streamSetHandler stream (callback . unpackObservableMessage) - synchronousDisposable $ streamClose stream + newDisposable $ streamClose stream retrieveFn :: forall m. MonadAsync m => m (Task v) retrieveFn = toTask <$> startRetrieveRequest diff --git a/test/Quasar/Network/MultiplexerSpec.hs b/test/Quasar/Network/MultiplexerSpec.hs index 399db6bf821ca8387f4daffef35280befd38cf91..34717f28502b61e72aeaeb965e1af9e8c14b0556 100644 --- a/test/Quasar/Network/MultiplexerSpec.hs +++ b/test/Quasar/Network/MultiplexerSpec.hs @@ -6,9 +6,11 @@ import Control.Concurrent.MVar import Control.Exception (bracket, mask_) import Control.Monad (forever, void, unless) import qualified Data.ByteString.Lazy as BSL -import Prelude +import Quasar.Awaitable +import Quasar.Disposable import Quasar.Network.Multiplexer import Quasar.Network.Connection +import Quasar.Prelude import Network.Socket import Test.Hspec @@ -16,17 +18,29 @@ spec :: Spec spec = describe "runMultiplexer" $ parallel $ do it "can be closed from the channelSetupHook" $ do (x, _) <- newDummySocketPair - runMultiplexer MultiplexerSideA channelClose x + runMultiplexer MultiplexerSideA (await <=< channelClose) x it "fails when run in masked state" $ do (x, _) <- newDummySocketPair - mask_ $ runMultiplexer MultiplexerSideA channelClose x `shouldThrow` anyException + mask_ $ runMultiplexer MultiplexerSideA (await <=< channelClose) x `shouldThrow` anyException it "closes when the remote is closed" $ do (x, y) <- newDummySocketPair concurrently_ (runMultiplexer MultiplexerSideA (const (pure ())) x) - (runMultiplexer MultiplexerSideB channelClose y) + (runMultiplexer MultiplexerSideB (await <=< channelClose) y) + + it "can dispose a resource" $ do + var <- newAsyncVar + (x, _) <- newDummySocketPair + runMultiplexer + do MultiplexerSideA + do + \channel -> do + attachDisposeAction_ channel.resourceManager (pure () <$ putAsyncVar_ var ()) + await =<< channelClose channel + do x + peekAwaitable var `shouldReturn` Just () it "can send and receive simple messages" $ do recvMVar <- newEmptyMVar @@ -110,7 +124,7 @@ withEchoServer fn = bracket setup closePair (\(channel, _) -> fn channel) configureEchoHandler echoChannel pure (mainChannel, echoChannel) closePair :: (Channel, Channel) -> IO () - closePair (x, y) = channelClose x >> channelClose y + closePair (x, y) = await =<< liftA2 (<>) (channelClose x) (channelClose y) configureEchoHandler :: Channel -> IO () configureEchoHandler channel = channelSetHandler channel (echoHandler channel) echoHandler :: Channel -> ReceivedMessageResources -> BSL.ByteString -> IO () diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs index 465d1d34ebc0c10f69c8e94345e9a331b11c5b85..d49dda32163f0ea1eb3cfa242827e17dd9f3d69f 100644 --- a/test/Quasar/NetworkSpec.hs +++ b/test/Quasar/NetworkSpec.hs @@ -105,13 +105,12 @@ spec = parallel $ do describe "StreamExample" $ do it "can open and close a stream" $ do withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do - streamClose =<< createMultiplyStream client + await =<< streamClose =<< createMultiplyStream client it "can open multiple streams in a single rpc call" $ do withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do (stream1, stream2) <- createStreams client - streamClose stream1 - streamClose stream2 + await =<< liftA2 (<>) (streamClose stream1) (streamClose stream2) aroundAll (\x -> withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do resultMVar <- liftIO newEmptyMVar @@ -175,9 +174,7 @@ spec = parallel $ do setObservableVar var 42 latestShouldBe 42 - it "receives no further updates after disposing the callback registration" $ do - pendingWith "not implemented" - + it "receives no further updates after unsubscribing" $ do var <- newObservableVar 42 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading