diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs index e4889d71dbf4325bf009c3139385961526ccb2a4..0915ecc5d412c22cf927a46f8f299c47529702f6 100644 --- a/src/Quasar/Network/Multiplexer.hs +++ b/src/Quasar/Network/Multiplexer.hs @@ -29,6 +29,7 @@ module Quasar.Network.Multiplexer ( -- ** Exception handling MultiplexerException(..), + MultiplexerDirection(..), ConnectionLostReason(..), ChannelException(..), channelReportProtocolError, @@ -123,7 +124,7 @@ data MultiplexerSide = MultiplexerSideA | MultiplexerSideB data MultiplexerException = ConnectionLost ConnectionLostReason | InvalidMagicBytes BS.ByteString - | LocalException SomeException + | LocalException MultiplexerDirection SomeException | RemoteException String | ProtocolException String | ReceivedProtocolException String @@ -136,6 +137,9 @@ instance Exception MultiplexerException where mconcat ["Magic bytes don't match: Expected ", show magicBytes, ", got ", show received] displayException ex = show ex +data MultiplexerDirection = Sending | Receiving + deriving stock Show + data ConnectionLostReason = SendFailed SomeException | ReceiveFailed SomeException @@ -196,8 +200,8 @@ newRootChannel multiplexer = do pure channel -newChannelSTM :: RawChannel -> ChannelId -> STMc NoRetry '[ChannelException] RawChannel -newChannelSTM parent@RawChannel{multiplexer, quasar=parentQuasar} channelId = +newChannel :: RawChannel -> ChannelId -> STMc NoRetry '[ChannelException] RawChannel +newChannel parent@RawChannel{multiplexer, quasar=parentQuasar} channelId = (flip (catchSTMc @NoRetry @'[ChannelException, FailedToAttachResource])) (\FailedToAttachResource -> throwC ChannelNotConnected) do -- Channels inherit their parents close state parentReceivedCloseMessage <- readTVar parent.receivedCloseMessage @@ -357,18 +361,19 @@ newMultiplexerInternal side connection = disposeOnError do channelsVar <- liftIO $ newTVarIO $ HM.singleton 0 rootChannel multiplexer <- mfix \multiplexer -> do - exSink <- catchSink (multiplexerExceptionHandler multiplexer) <$> askExceptionSink + sendingExSink <- catchSink (multiplexerExceptionHandler multiplexer Sending) <$> askExceptionSink + receivingExSink <- catchSink (multiplexerExceptionHandler multiplexer Receiving) <$> askExceptionSink - receiveTask <- unmanagedAsync exSink $ + receiveTask <- unmanagedAsync receivingExSink $ receiveThread multiplexer (receiveCheckEOF connection) - sendTask <- unmanagedAsync exSink $ + sendTask <- unmanagedAsync sendingExSink $ sendThread multiplexer connection.send let sendThreadCompleted = void $ toFuture sendTask receiveThreadCompleted = void $ toFuture receiveTask - registerDisposeActionIO do + registerDisposeActionIO_ do -- NOTE The network connection should be closed automatically when the root channel is closed. -- This action exists to ensure the network connection is not blocking a resource manager for an unbounded -- amount of time while having enough time to perform a graceful close. @@ -410,18 +415,19 @@ newMultiplexerInternal side connection = disposeOnError do pure (rootChannel, toFuture multiplexerResult) -multiplexerExceptionHandler :: MonadSTMc NoRetry '[] m => Multiplexer -> SomeException -> m () -multiplexerExceptionHandler multiplexer (toMultiplexerException -> ex) = liftSTMc @NoRetry @'[] do +multiplexerExceptionHandler :: MonadSTMc NoRetry '[] m => Multiplexer -> MultiplexerDirection -> SomeException -> m () +multiplexerExceptionHandler multiplexer direction (toMultiplexerException direction -> ex) = liftSTMc @NoRetry @'[] do unlessM (tryFulfillPromise multiplexer.multiplexerException ex) do queueLogError $ "Multiplexer ignored exception: " <> displayException ex <> "\nMultiplexer already failed with: " <> displayException ex disposeEventually_ multiplexer -toMultiplexerException :: SomeException -> MultiplexerException +toMultiplexerException :: MultiplexerDirection -> SomeException -> MultiplexerException -- Exception is a MultiplexerException already -toMultiplexerException (fromException -> Just ex) = ex +toMultiplexerException _ (fromException -> Just ex) = ex -- Otherwise it's a local exception (usually from application code) (may be on a multiplexer thread) -toMultiplexerException (fromException -> Just (AsyncException ex)) = LocalException ex -toMultiplexerException ex = LocalException ex +toMultiplexerException direction (fromException -> Just (AsyncException ex)) = + LocalException direction ex +toMultiplexerException direction ex = LocalException direction ex sendThread :: Multiplexer -> (BSL.ByteString -> IO ()) -> IO () @@ -446,7 +452,7 @@ sendThread multiplexer sendFn = do messages <- swapTVar multiplexer.outbox [] case messages of -- Exit when the receive thread has stopped and there is no error and no message left to send - [] -> pure () <$ awaitSTM multiplexer.receiveThreadCompleted + [] -> pure () <$ readFuture multiplexer.receiveThreadCompleted _ -> pure do bytes <- execWriterT do -- outbox is a list that is used as a queue, so it has to be reversed to preserve the correct order @@ -457,7 +463,7 @@ sendThread multiplexer sendFn = do sendException :: MultiplexerException -> StateT ChannelId IO () sendException (ConnectionLost _) = pure () sendException (InvalidMagicBytes _) = pure () - sendException (LocalException ex) = liftIO $ send $ Binary.put $ InternalError $ show ex + sendException (LocalException _ ex) = liftIO $ send $ Binary.put $ InternalError $ displayException ex sendException (RemoteException _) = pure () sendException (ProtocolException message) = liftIO $ send $ Binary.put $ MultiplexerProtocolError message sendException (ReceivedProtocolException _) = pure () @@ -549,7 +555,7 @@ receiveThread multiplexer readFn = do sentClose <- readTVar channel.sentCloseMessage -- Create channels even if the current channel has been closed, to stay in sync with the remote side createdChannelIds <- stateTVar multiplexer.nextReceiveChannelId (createChannelIds newChannelCount) - createdChannels <- mapM (newChannelSTM channel) createdChannelIds + createdChannels <- mapM (newChannel channel) createdChannelIds pure do -- Receiving messages after the remote side closed a channel is a protocol error when closedByRemote $ protocolException $ @@ -702,7 +708,7 @@ sendRawChannelMessageInternal queueBehavior channel@RawChannel{multiplexer} msgH disposeEventually_ channel createdChannelIds <- stateTVar multiplexer.nextSendChannelId (createChannelIds createChannels) - createdChannels <- liftSTMc $ mapM (newChannelSTM channel) createdChannelIds + createdChannels <- liftSTMc $ mapM (newChannel channel) createdChannelIds let res = SentMessageResources { messageId, @@ -771,9 +777,9 @@ rawChannelSetBinaryHandler channel fn = rawChannelSetInternalHandler channel bin stepDecoder :: Decoder a -> Maybe BS.ByteString -> IO InternalMessageHandler stepDecoder (Fail _ _ errMsg) _ = throwM $ ChannelProtocolException channel.channelId $ "Failed to parse channel message: " <> errMsg stepDecoder (Partial feedFn) chunk@(Just _) = pure $ InternalMessageHandler $ stepDecoder (feedFn chunk) - stepDecoder (Partial feedFn) Nothing = throwM $ ChannelProtocolException channel.channelId $ "End of message has been reached but decoder expects more data" - stepDecoder (Done "" _ result) Nothing = InternalMessageHandler (const impossibleCodePathM) <$ runHandler result - stepDecoder (Done _ bytesRead msg) _ = throwM $ ChannelProtocolException channel.channelId $ + stepDecoder (Partial _feedFn) Nothing = throwM $ ChannelProtocolException channel.channelId $ "End of message has been reached but decoder expects more data" + stepDecoder (Done "" _ result) Nothing = InternalMessageHandler (const unreachableCodePathM) <$ runHandler result + stepDecoder (Done _ bytesRead _msg) _ = throwM $ ChannelProtocolException channel.channelId $ mconcat ["Decoder failed to consume complete message (", show (fromIntegral resources.messageLength - bytesRead), " bytes left)"] runHandler :: a -> IO ()