From 57cccfddcbaf559dbb74339ddb25844961236eb2 Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Mon, 30 Aug 2021 22:42:43 +0200
Subject: [PATCH] Implement simple multiplexer resource management

Co-authored-by: Jan Beinke <git@janbeinke.com>
---
 flake.lock                               |  6 +--
 quasar-network.cabal                     |  1 +
 src/Quasar/Network/Multiplexer.hs        | 61 ++++++++++++++++--------
 src/Quasar/Network/Runtime.hs            |  4 +-
 src/Quasar/Network/Runtime/Observable.hs |  2 +-
 test/Quasar/Network/MultiplexerSpec.hs   | 24 ++++++++--
 test/Quasar/NetworkSpec.hs               |  9 ++--
 7 files changed, 69 insertions(+), 38 deletions(-)

diff --git a/flake.lock b/flake.lock
index 49b7e0a..8bd4d7e 100644
--- a/flake.lock
+++ b/flake.lock
@@ -21,11 +21,11 @@
       },
       "locked": {
         "host": "git.c3pb.de",
-        "lastModified": 1630286998,
-        "narHash": "sha256-2rGt7QPAHXuHP8mn/vxzXJotrGcv93jyQTZTPrwZ1w8=",
+        "lastModified": 1630355486,
+        "narHash": "sha256-/VpwedqTMkDhwy8aFzWYMDxoMZA97LzasGCV5+v09Jc=",
         "owner": "jens",
         "repo": "quasar",
-        "rev": "499e025611d10540dbebf0e2732a0316a6e111bd",
+        "rev": "b6d957517d5f3796796f14f21af2a34023ac5002",
         "type": "gitlab"
       },
       "original": {
diff --git a/quasar-network.cabal b/quasar-network.cabal
index baa3650..1e2d888 100644
--- a/quasar-network.cabal
+++ b/quasar-network.cabal
@@ -63,6 +63,7 @@ common shared-properties
     async,
     binary,
     bytestring,
+    exceptions,
     hashable,
     mtl,
     network,
diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs
index 999963a..6a45bbc 100644
--- a/src/Quasar/Network/Multiplexer.hs
+++ b/src/Quasar/Network/Multiplexer.hs
@@ -25,7 +25,8 @@ module Quasar.Network.Multiplexer (
 
 
 import Control.Concurrent.Async (AsyncCancelled(..), async, link, race_, wait, waitAnyCancel, withAsync, withAsyncWithUnmask)
-import Control.Exception (Exception(..), Handler(..), MaskingState(Unmasked), SomeException(..), catch, catches, handle, interruptible, getMaskingState, mask_)
+import Control.Exception (MaskingState(Unmasked), interruptible, getMaskingState)
+import Control.Monad.Catch
 import Control.Monad.State (StateT, execStateT, runStateT, lift)
 import qualified Control.Monad.State as State
 import Control.Concurrent.MVar
@@ -37,6 +38,8 @@ import qualified Data.ByteString.Lazy as BSL
 import qualified Data.HashMap.Strict as HM
 import Data.Tuple (swap)
 import Data.Word
+import Quasar.Awaitable
+import Quasar.Disposable
 import Quasar.Network.Connection
 import Quasar.Prelude
 import System.IO (hPutStrLn, stderr)
@@ -119,6 +122,7 @@ instance HasResourceManager Channel where
   getResourceManager channel = channel.resourceManager
 
 data ChannelState = ChannelState {
+  resourceManager :: ResourceManager,
   connectionState :: ChannelConnectivity,
   children :: [Channel]
 }
@@ -172,7 +176,7 @@ newMultiplexer side x = do
 -- | Starts a new multiplexer on the provided connection and blocks until it is closed.
 -- The channel is provided to a setup action and can be closed by calling 'closeChannel'; otherwise the multiplexer will run until the underlying connection is closed.
 runMultiplexer :: (IsConnection a) => MultiplexerSide -> (Channel -> IO ()) -> a -> IO ()
-runMultiplexer side channelSetupHook connection = do
+runMultiplexer side channelSetupHook connection = withResourceManager \resourceManager -> do
   -- Running in masked state, this thread (running the receive-function) cannot be interrupted when closing the connection
   maskingState <- getMaskingState
   when (maskingState /= Unmasked) (fail "'runMultiplexer' cannot run in masked thread state.")
@@ -193,7 +197,7 @@ runMultiplexer side channelSetupHook connection = do
     }
     run :: IO (MultiplexerException, Bool)
     run = do
-      rootChannel <- withMultiplexerState worker (newChannel worker 0 Connected)
+      rootChannel <- withMultiplexerState worker (newChannel resourceManager worker 0 Connected)
       channelSetupHook rootChannel
       withAsync (receiveThread worker) $ \receiveTask ->
         withAsync (throwIO =<< multiplexerWaitUntilClosed worker) $ \waitForCloseTask -> do
@@ -424,21 +428,32 @@ multiplexerSwitchChannel worker channelId = do
 
 -- | Closes a channel and all it's children: After the function completes, the channel callback will no longer be called on received messages and sending messages on the channel will fail.
 -- Calling close on a closed channel is a noop.
-channelClose :: Channel -> IO ()
-channelClose channel = do
-  -- Change channel state of all unclosed channels in the tree to closed
-  channelWasClosed <- channelClose' channel
-
-  when channelWasClosed $
-    -- Closing a channel on a Connection that is no longer connected should not throw an exception (channelClose is a resource management operation and is supposed to be idempotent)
-    handle (\(_ :: MultiplexerException) -> pure ()) $ do
-      -- Send close message
-      withMultiplexerState worker $ do
-        multiplexerSwitchChannel worker channel.channelId
-        multiplexerStateSend worker CloseChannel
-
-      -- Terminate the worker when the root channel is closed
-      when (channel.channelId == 0) $ multiplexerClose_ ConnectionClosed worker
+--
+-- Alias for `dispose`.
+channelClose :: Channel -> IO (Awaitable ())
+channelClose = dispose
+
+instance IsDisposable Channel where
+  toDisposable channel = toDisposable channel.resourceManager
+
+disposeChannel :: Channel -> IO (Awaitable ())
+disposeChannel channel = do
+    -- Change channel state of all unclosed channels in the tree to closed
+    channelWasClosed <- channelClose' channel
+
+    when channelWasClosed $
+      -- Closing a channel on a Connection that is no longer connected should not throw an exception (channelClose is a resource management operation and is supposed to be idempotent)
+      handle (\(_ :: MultiplexerException) -> pure ()) $ do
+        -- Send close message
+        withMultiplexerState worker $ do
+          multiplexerSwitchChannel worker channel.channelId
+          multiplexerStateSend worker CloseChannel
+
+        -- Terminate the worker when the root channel is closed
+        when (channel.channelId == 0) $ multiplexerClose_ ConnectionClosed worker
+
+    -- Return completed awaitabe (so this can be inserted into a ResourceManager)
+    pure $ pure ()
   where
     worker = channel.worker
     channelClose' :: Channel -> IO Bool
@@ -529,7 +544,7 @@ newSubChannel :: MultiplexerWorker -> ChannelId -> (StateT ChannelState (StateT
 newSubChannel worker channelId = do
   parentChannelState <- State.get
   -- Holding the parents channelState while initializing the channel will ensure the ChannelConnectivity is inherited atomically
-  createdChannel <- lift $ newChannel worker channelId parentChannelState.connectionState
+  createdChannel <- lift $ newChannel parentChannelState.resourceManager worker channelId parentChannelState.connectionState
 
   let newParentState = parentChannelState{
     children = createdChannel : parentChannelState.children
@@ -537,9 +552,11 @@ newSubChannel worker channelId = do
   State.put newParentState
   pure createdChannel
 
-newChannel :: MultiplexerWorker -> ChannelId -> ChannelConnectivity -> StateT MultiplexerWorkerState IO Channel
-newChannel worker channelId connectionState = do
+newChannel :: ResourceManager -> MultiplexerWorker -> ChannelId -> ChannelConnectivity -> StateT MultiplexerWorkerState IO Channel
+newChannel parentResourceManager worker channelId connectionState = do
+  resourceManager <- newResourceManager parentResourceManager
   stateMVar <- liftIO $ newMVar ChannelState {
+    resourceManager,
     connectionState,
     children = []
   }
@@ -551,6 +568,7 @@ newChannel worker channelId connectionState = do
   }
   handlerAtVar <- liftIO newEmptyAtVar
   let channel = Channel {
+    resourceManager,
     worker,
     channelId,
     stateMVar,
@@ -558,6 +576,7 @@ newChannel worker channelId connectionState = do
     receiveStateMVar,
     handlerAtVar
   }
+  attachDisposeAction resourceManager (disposeChannel channel)
   State.modify $ \multiplexerState -> multiplexerState{channels = HM.insert channelId channel multiplexerState.channels}
   pure channel
 
diff --git a/src/Quasar/Network/Runtime.hs b/src/Quasar/Network/Runtime.hs
index 9a57e1b..3aa9cae 100644
--- a/src/Quasar/Network/Runtime.hs
+++ b/src/Quasar/Network/Runtime.hs
@@ -117,7 +117,7 @@ clientHandleChannelMessage client resources msg = case decodeOrFail msg of
           Nothing -> channelReportProtocolError client.channel ("Received response with invalid request id " <> show requestId)
       callback resp
 
-clientClose :: Client p -> IO ()
+clientClose :: Client p -> IO (Awaitable ())
 clientClose client = channelClose client.channel
 
 clientReportProtocolError :: Client p -> String -> IO a
@@ -156,7 +156,7 @@ streamSend (Stream channel) value = liftIO $ channelSendSimple channel (encode v
 streamSetHandler :: (Binary down, MonadIO m) => Stream up down -> (down -> IO ()) -> m ()
 streamSetHandler (Stream channel) handler = liftIO $ channelSetSimpleHandler channel handler
 
-streamClose :: MonadIO m => Stream up down -> m ()
+streamClose :: MonadIO m => Stream up down -> m (Awaitable ())
 streamClose (Stream channel) = liftIO $ channelClose channel
 
 -- ** Running client and server
diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs
index 18e48e5..eb4e7ec 100644
--- a/src/Quasar/Network/Runtime/Observable.hs
+++ b/src/Quasar/Network/Runtime/Observable.hs
@@ -44,7 +44,7 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv
       -- TODO send updates about the connection status
       stream <- startObserveRequest
       streamSetHandler stream (callback . unpackObservableMessage)
-      synchronousDisposable $ streamClose stream
+      newDisposable $ streamClose stream
     retrieveFn :: forall m. MonadAsync m => m (Task v)
     retrieveFn = toTask <$> startRetrieveRequest
 
diff --git a/test/Quasar/Network/MultiplexerSpec.hs b/test/Quasar/Network/MultiplexerSpec.hs
index 399db6b..34717f2 100644
--- a/test/Quasar/Network/MultiplexerSpec.hs
+++ b/test/Quasar/Network/MultiplexerSpec.hs
@@ -6,9 +6,11 @@ import Control.Concurrent.MVar
 import Control.Exception (bracket, mask_)
 import Control.Monad (forever, void, unless)
 import qualified Data.ByteString.Lazy as BSL
-import Prelude
+import Quasar.Awaitable
+import Quasar.Disposable
 import Quasar.Network.Multiplexer
 import Quasar.Network.Connection
+import Quasar.Prelude
 import Network.Socket
 import Test.Hspec
 
@@ -16,17 +18,29 @@ spec :: Spec
 spec = describe "runMultiplexer" $ parallel $ do
   it "can be closed from the channelSetupHook" $ do
     (x, _) <- newDummySocketPair
-    runMultiplexer MultiplexerSideA channelClose x
+    runMultiplexer MultiplexerSideA (await <=< channelClose) x
 
   it "fails when run in masked state" $ do
     (x, _) <- newDummySocketPair
-    mask_ $ runMultiplexer MultiplexerSideA channelClose x `shouldThrow` anyException
+    mask_ $ runMultiplexer MultiplexerSideA (await <=< channelClose) x `shouldThrow` anyException
 
   it "closes when the remote is closed" $ do
     (x, y) <- newDummySocketPair
     concurrently_
       (runMultiplexer MultiplexerSideA (const (pure ())) x)
-      (runMultiplexer MultiplexerSideB channelClose y)
+      (runMultiplexer MultiplexerSideB (await <=< channelClose) y)
+
+  it "can dispose a resource" $ do
+    var <- newAsyncVar
+    (x, _) <- newDummySocketPair
+    runMultiplexer
+      do MultiplexerSideA
+      do
+        \channel -> do
+          attachDisposeAction_ channel.resourceManager (pure () <$ putAsyncVar_ var ())
+          await =<< channelClose channel
+      do x
+    peekAwaitable var `shouldReturn` Just ()
 
   it "can send and receive simple messages" $ do
     recvMVar <- newEmptyMVar
@@ -110,7 +124,7 @@ withEchoServer fn = bracket setup closePair (\(channel, _) -> fn channel)
       configureEchoHandler echoChannel
       pure (mainChannel, echoChannel)
     closePair :: (Channel, Channel) -> IO ()
-    closePair (x, y) = channelClose x >> channelClose y
+    closePair (x, y) = await =<< liftA2 (<>) (channelClose x) (channelClose y)
     configureEchoHandler :: Channel -> IO ()
     configureEchoHandler channel = channelSetHandler channel (echoHandler channel)
     echoHandler :: Channel -> ReceivedMessageResources -> BSL.ByteString -> IO ()
diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs
index 465d1d3..d49dda3 100644
--- a/test/Quasar/NetworkSpec.hs
+++ b/test/Quasar/NetworkSpec.hs
@@ -105,13 +105,12 @@ spec = parallel $ do
   describe "StreamExample" $ do
     it "can open and close a stream" $ do
       withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do
-        streamClose =<< createMultiplyStream client
+        await =<< streamClose =<< createMultiplyStream client
 
     it "can open multiple streams in a single rpc call" $ do
       withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do
         (stream1, stream2) <- createStreams client
-        streamClose stream1
-        streamClose stream2
+        await =<< liftA2 (<>) (streamClose stream1) (streamClose stream2)
 
     aroundAll (\x -> withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do
         resultMVar <- liftIO newEmptyMVar
@@ -175,9 +174,7 @@ spec = parallel $ do
           setObservableVar var 42
           latestShouldBe 42
 
-    it "receives no further updates after disposing the callback registration" $ do
-      pendingWith "not implemented"
-
+    it "receives no further updates after unsubscribing" $ do
       var <- newObservableVar 42
       withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
         resultVar <- newTVarIO ObservableLoading
-- 
GitLab