From f5f81228f5d8b8f8299b5a563e17b2f187245bfe Mon Sep 17 00:00:00 2001
From: Jens Nolte <>
Date: Sat, 14 Mar 2020 02:36:30 +0100
Subject: [PATCH] Implement control socket stream reconnect

 src/QBar/ControlSocket.hs | 55 ++++++++++++++++++++++++++++++++++-----
 src/QBar/Host.hs          | 28 +++-----------------
 src/QBar/Util.hs          | 48 ++++++++++++++++++++++++++++++++++
 3 files changed, 100 insertions(+), 31 deletions(-)

diff --git a/src/QBar/ControlSocket.hs b/src/QBar/ControlSocket.hs
index 6a6328b..3095aa6 100644
--- a/src/QBar/ControlSocket.hs
+++ b/src/QBar/ControlSocket.hs
@@ -13,17 +13,17 @@ module QBar.ControlSocket where
 import QBar.BlockOutput
 import QBar.Core
 import QBar.Host
+import QBar.Time
 import QBar.Util
 import Control.Concurrent (forkFinally)
 import Control.Concurrent.Async
-import Control.Exception (SomeException, handle, catch)
+import Control.Exception (SomeException, IOException, handle)
 import Data.Aeson (FromJSON, ToJSON)
 import Data.Aeson.TH
 import qualified Data.ByteString.Char8 as BSC
-import System.FilePath ((</>))
-import System.IO
 import Data.Text.Lazy (pack)
+import Data.Time.Clock (getCurrentTime, addUTCTime)
 import qualified Data.Text.Lazy as T
 import qualified Data.Text.Lazy.IO as T
 import Network.Socket
@@ -31,11 +31,14 @@ import Pipes
 import Pipes.Concurrent as PC (Output, spawn, spawn', unbounded, newest, toOutput, fromInput, send, atomically)
 import Pipes.Parse
 import qualified Pipes.Prelude as PP
+import Pipes.Safe (catch)
 import Pipes.Aeson (decode, DecodingError)
 import Pipes.Aeson.Unchecked (encode)
 import Pipes.Network.TCP (fromSocket, toSocket)
 import System.Directory (removeFile, doesFileExist)
 import System.Environment (getEnv)
+import System.FilePath ((</>))
+import System.IO
 type CommandHandler = Command -> IO CommandResult
@@ -47,8 +50,8 @@ class (ToJSON (Up s), FromJSON (Up s), ToJSON (Down s), FromJSON (Down s)) => Is
   toStreamType :: s -> StreamType
   streamClient :: s -> MainOptions -> BarIO (Consumer (Up s) IO (), Producer (Down s) IO ())
-  streamClient s options@MainOptions{verbose} = do
-    sock <- liftIO $ connectIpcSocket options
+  streamClient s options@MainOptions{verbose} = liftIO $ do
+    sock <- connectIpcSocket options
     runEffect (encode (StartStream $ toStreamType s) >-> toSocket sock)
     let up = forever (await >>= encode) >-> verbosePrintP >-> toSocket sock
     let down = decodeStreamSafe options (fromSocket sock 4096 >-> verbosePrintP)
@@ -56,6 +59,7 @@ class (ToJSON (Up s), FromJSON (Up s), ToJSON (Down s), FromJSON (Down s)) => Is
       verbosePrintP :: Pipe ByteString ByteString IO ()
       verbosePrintP = if verbose then PP.chain $ BSC.hPutStrLn stderr else cat
   handleByteStream :: s -> MainOptions -> Producer ByteString IO () -> Consumer ByteString IO () -> BarIO ()
   handleByteStream s options up down = do
     (handleUp, handleDown, cleanup) <- streamHandler s
@@ -67,6 +71,40 @@ class (ToJSON (Up s), FromJSON (Up s), ToJSON (Down s), FromJSON (Down s)) => Is
       void $ waitEitherCancel readTask writeTask
+data ReconnectMode a = ReconnectNoResend | ReconnectSendLatest a
+reconnectClient :: forall up down. ReconnectMode up -> BarIO (Consumer up IO (), Producer down IO ()) -> BarIO (Consumer up IO (), Producer down IO ())
+reconnectClient reconnectMode connectClient = do
+  (upConsumer, upProducer) <- case reconnectMode of
+    ReconnectNoResend  -> liftIO $ mkBroadcastP
+    ReconnectSendLatest initial -> liftIO $ mkBroadcastCacheP initial
+  (downOutput, downInput) <- liftIO $ spawn unbounded
+  let downConsumer = toOutput downOutput
+  let downProducer = fromInput downInput
+  task <- barAsync $ forever $ do
+    (upStreamConsumer, downStreamProducer) <- connectRetry
+    liftIO $ do
+      readTask <- async $ runEffect $ downStreamProducer >-> downConsumer
+      writeTask <- async $ runEffect $ upProducer >-> upStreamConsumer
+      void $ waitEitherCancel readTask writeTask
+  liftIO $ link task
+  return (upConsumer, downProducer)
+  where
+    connectRetry :: BarIO (Consumer up IO (), Producer down IO ())
+    connectRetry = catch connectClient (\(_ :: IOException) -> liftIO (hPutStrLn stderr "Socket connection failed. Retrying...") >> reconnectDelay >> silentConnectRetry)
+    silentConnectRetry :: BarIO (Consumer up IO (), Producer down IO ())
+    silentConnectRetry = catch connectClient (\(_ :: IOException) -> reconnectDelay >> silentConnectRetry)
+    reconnectDelay :: BarIO ()
+    reconnectDelay = do
+      time <- liftIO getCurrentTime
+      let nextSecond = addUTCTime 1 time
+      sleepUntil nextSecond
 decodeStreamSafe :: FromJSON v => MainOptions -> Producer ByteString IO () -> Producer v IO ()
 decodeStreamSafe MainOptions{verbose} inputStream = decodeStream inputStream >-> failOnEmptyStream >-> failOnDecodingError
@@ -233,11 +271,14 @@ sendIpc' command options = catch sendCommand handleException
 sendBlockStream :: BarIO () -> MainOptions -> IO ()
-sendBlockStream loadBlocks options = runBarHost (streamClient BlockStream options) loadBlocks
+sendBlockStream loadBlocks options = runBarHost blockStreamClient loadBlocks
+  where
+    blockStreamClient :: BarIO (Consumer [BlockOutput] IO (), Producer BlockEvent IO ())
+    blockStreamClient = reconnectClient (ReconnectSendLatest []) $ streamClient BlockStream options
 addServerMirrorStream :: MainOptions -> BarIO ()
 addServerMirrorStream options = do
-  (blockEventConsumer, blockOutputProducer) <- streamClient MirrorStream options
+  (blockEventConsumer, blockOutputProducer) <- reconnectClient ReconnectNoResend $ streamClient MirrorStream options
   (eventOutput, eventInput) <- liftIO $ spawn unbounded
   bar <- askBar
diff --git a/src/QBar/Host.hs b/src/QBar/Host.hs
index 3524b6d..999c4ea 100644
--- a/src/QBar/Host.hs
+++ b/src/QBar/Host.hs
@@ -6,13 +6,13 @@ module QBar.Host where
 import QBar.BlockOutput
 import QBar.Core
 import QBar.Time
+import QBar.Util
 import Control.Concurrent (forkIO, forkFinally, threadDelay)
 import Control.Concurrent.Async (async, wait, waitBoth)
 import qualified Control.Concurrent.Event as Event
 import Control.Concurrent.MVar (MVar, newMVar, modifyMVar_, swapMVar)
 import Control.Concurrent.STM.TChan
-import Control.Concurrent.STM.TVar
 import Control.Exception (SomeException, catch)
 import Control.Lens hiding (each, (.=))
 import Control.Monad.STM (atomically)
@@ -199,11 +199,10 @@ runBarHost' initializeBarAction = do
   (eventOutput, eventInput) <- spawn unbounded
   -- Create cache for block outputs
-  cache <- (,) <$> newTVarIO [] <*> newBroadcastTChanIO
-  let blockOutputProducer = blockOutputFromCache cache
+  (cacheConsumer, cacheProducer) <- mkBroadcastCacheP []
   -- Important: both monads (output producer / event consumer) will be forked whenever a new output connects!
-  let attachBarOutputInternal = attachBarOutputImpl blockOutputProducer (toOutput eventOutput)
+  let attachBarOutputInternal = attachBarOutputImpl cacheProducer (toOutput eventOutput)
   let requestBarUpdate = requestBarUpdateHandler hostHandle
@@ -217,7 +216,7 @@ runBarHost' initializeBarAction = do
   runBarIO bar initializeBarAction
   -- Run blocks and send filtered output to connected clients
-  blockTask <- async $ runEffect $ runBlocks bar hostHandle >-> filterDuplicates >-> blockOutputToCache cache
+  blockTask <- async $ runEffect $ runBlocks bar hostHandle >-> filterDuplicates >-> cacheConsumer
   -- Dispatch incoming events to blocks
   eventTask <- async $ runEffect $ fromInput eventInput >-> eventDispatcher bar eventHandlerListIORef
@@ -225,25 +224,6 @@ runBarHost' initializeBarAction = do
   void $ waitBoth blockTask eventTask
-    blockOutputToCache :: (TVar [BlockOutput], TChan [BlockOutput]) -> Consumer [BlockOutput] IO ()
-    blockOutputToCache (var, chan) = forever $ do
-      value <- await
-      liftIO . atomically $ do
-        writeTVar var value
-        writeTChan chan value
-    -- Monad will be forked when new outputs connect
-    blockOutputFromCache :: (TVar [BlockOutput], TChan [BlockOutput]) -> Producer [BlockOutput] IO ()
-    blockOutputFromCache (var, chan) = do
-      (outputChan, value) <- liftIO . atomically $ do
-        value <- readTVar var
-        outputChan <- dupTChan chan
-        return (outputChan, value)
-      yield value
-      forever $ yield =<< (liftIO . atomically $ readTChan outputChan)
     attachBarOutputImpl :: Producer [BlockOutput] IO () -> Consumer BlockEvent IO () -> (Consumer [BlockOutput] IO (), Producer BlockEvent IO ()) -> IO ()
     attachBarOutputImpl blockOutputProducer eventConsumer (barOutputConsumer, barEventProducer) = do
diff --git a/src/QBar/Util.hs b/src/QBar/Util.hs
index 8b94dda..d3dae30 100644
--- a/src/QBar/Util.hs
+++ b/src/QBar/Util.hs
@@ -1,6 +1,9 @@
 module QBar.Util where
 import Control.Concurrent.Event as Event
+import Control.Concurrent.STM (atomically)
+import Control.Concurrent.STM.TChan
+import Control.Concurrent.STM.TVar
 import Control.Monad (replicateM)
 import qualified Data.Text.Lazy as T
 import Pipes
@@ -19,3 +22,48 @@ randomIdentifier = liftIO $ T.pack <$> replicateM 8 randomCharacter
       return $ T.index alphabet index
     alphabet :: T.Text
     alphabet = T.pack $ ['A'..'Z'] ++ ['a'..'z'] ++ ['0'..'9']
+-- |Creates a pair of consumer and producer. Both can be used multiple times in parallel.
+-- |All values send to a consumer will be sent to all currently running producers.
+mkBroadcastP :: forall a. IO (Consumer a IO (), Producer a IO ())
+mkBroadcastP = do
+  chan <- newBroadcastTChanIO
+  return (sendToStore chan, recvFromStore chan)
+  where
+    sendToStore :: TChan a -> Consumer a IO ()
+    sendToStore chan = forever $ do
+      value <- await
+      liftIO . atomically $ writeTChan chan value
+    -- Monad will be forked when new outputs connect
+    recvFromStore :: TChan a -> Producer a IO ()
+    recvFromStore chan = do
+      outputChan <- liftIO . atomically $ dupTChan chan
+      forever $ yield =<< (liftIO . atomically $ readTChan outputChan)
+-- |Creates a pair of consumer and producer. Both can be used multiple times in parallel.
+-- |All values send to a consumer will be sent to all currently running producers.
+-- |When running a new producer it will immediateley receive the latest value that was sent to a consumer.
+mkBroadcastCacheP :: forall a. a -> IO (Consumer a IO (), Producer a IO ())
+mkBroadcastCacheP initialValue = do
+  store <- (,) <$> newTVarIO initialValue <*> newBroadcastTChanIO
+  return (sendToStore store, recvFromStore store)
+  where
+    sendToStore :: (TVar a, TChan a) -> Consumer a IO ()
+    sendToStore (var, chan) = forever $ do
+      value <- await
+      liftIO . atomically $ do
+        writeTVar var value
+        writeTChan chan value
+    -- Monad will be forked when new outputs connect
+    recvFromStore :: (TVar a, TChan a) -> Producer a IO ()
+    recvFromStore (var, chan) = do
+      (outputChan, value) <- liftIO . atomically $ do
+        value <- readTVar var
+        outputChan <- dupTChan chan
+        return (outputChan, value)
+      yield value
+      forever $ yield =<< (liftIO . atomically $ readTChan outputChan)