From d4d81cb77a65172f37a376fda658666e2e4bc8bc Mon Sep 17 00:00:00 2001
From: Jens Nolte <jens@nightmarestudio.de>
Date: Wed, 11 Mar 2020 00:57:17 +0100
Subject: [PATCH] Implement server mirroring

---
 src/QBar/Cli.hs           | 23 +++++++----
 src/QBar/ControlSocket.hs | 60 ++++++++++++++++++++++++++--
 src/QBar/Core.hs          | 17 +++++++-
 src/QBar/Host.hs          | 74 +++++++++++++++++++++++++++++------
 src/QBar/Server.hs        | 82 +++++++++++++++++++++++++--------------
 5 files changed, 202 insertions(+), 54 deletions(-)

diff --git a/src/QBar/Cli.hs b/src/QBar/Cli.hs
index a547117..619eb15 100644
--- a/src/QBar/Cli.hs
+++ b/src/QBar/Cli.hs
@@ -39,16 +39,30 @@ mainParser = do
 barCommandParser :: Parser (MainOptions -> IO ())
 barCommandParser = hsubparser (
     command "server" (info serverCommandParser (progDesc "Start a new server.")) <>
+    command "mirror" (info mirrorCommandParser (progDesc "Mirror the output of a running server.")) <>
     command "pipe" (info pipeClientParser (progDesc "Redirects the stdin of this process to a running bar.")) <>
     command "theme" (info themeCommandParser (progDesc "Change the theme of the running qbar server."))
   )
 
 serverCommandParser :: Parser (MainOptions -> IO ())
 serverCommandParser = hsubparser (
-    command "swaybar" (info (runBarServer <$> barConfigurationParser) (progDesc "Start a new server for swaybar. Should be called by swaybar.")) <>
-    command "i3bar" (info (runBarServer <$> barConfigurationParser) (progDesc "Start a new server for i3bar. Should be called by i3bar.")) <>
+    command "swaybar" (info (runBarServer <$> barConfigurationParser) (progDesc "Start a new server. Should be called by swaybar.")) <>
+    command "i3bar" (info (runBarServer <$> barConfigurationParser) (progDesc "Start a new server. Should be called by i3bar.")) <>
     command "send" (info (sendBlockStream <$> barConfigurationParser) (progDesc "Run blocks on this process but send them to another qbar server."))
   )
+  where
+    barConfigurationParser :: Parser (BarIO ())
+    barConfigurationParser = sequence_ <$> some blockParser
+
+mirrorCommandParser :: Parser (MainOptions -> IO ())
+mirrorCommandParser = hsubparser (
+    command "swaybar" (info (runBarServerMirror <$> barConfigurationParser) (progDesc "Mirror the output of another server. Should be called by swaybar.")) <>
+    command "i3bar" (info (runBarServerMirror <$> barConfigurationParser) (progDesc "Mirror the output of another server. Should be called by i3bar."))
+  )
+  where
+    barConfigurationParser :: Parser (BarIO ())
+    barConfigurationParser = sequence_ <$> many blockParser
+
 
 themeCommandParser :: Parser (MainOptions -> IO ())
 themeCommandParser = sendIpc . SetTheme <$> strArgument (metavar "THEME" <> completeWith (map T.unpack themeNames))
@@ -58,11 +72,6 @@ pipeClientParser = do
   events <- switch $ long "events" <> short 'e' <> help "Also encode events to stdout. Every event will be a JSON-encoded line."
   pure $ runPipeClient events
 
-barConfigurationParser :: Parser (BarIO ())
-barConfigurationParser = do
-  blocks <- some blockParser
-  pure $ sequence_ blocks
-
 blockParser :: Parser (BarIO ())
 blockParser =
   subparser (
diff --git a/src/QBar/ControlSocket.hs b/src/QBar/ControlSocket.hs
index 24ae71c..52f1b97 100644
--- a/src/QBar/ControlSocket.hs
+++ b/src/QBar/ControlSocket.hs
@@ -5,6 +5,7 @@
 {-# LANGUAGE MultiParamTypeClasses #-}
 {-# LANGUAGE TypeFamilies #-}
 {-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE InstanceSigs #-}
 
 module QBar.ControlSocket where
 
@@ -25,7 +26,7 @@ import Data.Text.Lazy (pack)
 import qualified Data.Text.Lazy as T
 import Network.Socket
 import Pipes
-import Pipes.Concurrent as PC (Output, spawn', unbounded, fromInput, send, atomically)
+import Pipes.Concurrent as PC (Output, spawn, spawn', unbounded, newest, toOutput, fromInput, send, atomically)
 import Pipes.Parse
 import qualified Pipes.Prelude as PP
 import Pipes.Aeson (decode, DecodingError)
@@ -43,7 +44,7 @@ class (ToJSON (Up s), FromJSON (Up s), ToJSON (Down s), FromJSON (Down s)) => Is
   streamHandler :: s -> BarIO (Consumer (Up s) IO (), Producer (Down s) IO (), IO ())
   toStreamType :: s -> StreamType
 
-  streamClient :: (MonadIO m) => s -> MainOptions -> m (Consumer (Up s) IO (), Producer (Down s) IO ())
+  streamClient :: s -> MainOptions -> BarIO (Consumer (Up s) IO (), Producer (Down s) IO ())
   streamClient s options@MainOptions{verbose} = do
     sock <- liftIO $ connectIpcSocket options
     runEffect (encode (StartStream $ toStreamType s) >-> toSocket sock)
@@ -93,9 +94,10 @@ decodeStreamSafe MainOptions{verbose} inputStream = decodeStream inputStream >->
             Right v -> yield v >> failOnDecodingError'
 
 
-data StreamType = BlockStreamType BlockStream
+data StreamType = BlockStreamType BlockStream | MirrorStreamType MirrorStream
 mapStreamType :: StreamType -> (forall a. IsStream a => a -> b) -> b
 mapStreamType (BlockStreamType a) f = f a
+mapStreamType (MirrorStreamType a) f = f a
 
 
 data BlockStream = BlockStream
@@ -103,6 +105,7 @@ instance IsStream BlockStream where
   type Up BlockStream = [BlockOutput]
   type Down BlockStream = BlockEvent
   toStreamType = BlockStreamType
+  streamHandler :: BlockStream -> BarIO (Consumer [BlockOutput] IO (), Producer BlockEvent IO (), IO ())
   streamHandler _ = do
     (cache, updateCacheC, sealCache) <- newCache'
     (eventOutput, eventInput, eventSeal) <- liftIO $ spawn' unbounded
@@ -136,6 +139,21 @@ instance IsStream BlockStream where
       updateBarP bar = forever $ await >>= yield >> liftIO (updateBarDefault' bar)
 
 
+data MirrorStream = MirrorStream
+instance IsStream MirrorStream where
+  type Up MirrorStream = BlockEvent
+  type Down MirrorStream = [BlockOutput]
+  toStreamType = MirrorStreamType
+  streamHandler :: MirrorStream -> BarIO (Consumer BlockEvent IO (), Producer [BlockOutput] IO (), IO ())
+  streamHandler _ = do
+    (eventOutput, eventInput, eventSeal) <- liftIO $ spawn' unbounded
+    (blockOutput, blockInput, blockSeal) <- liftIO $ spawn' $ newest 1
+    let seal = atomically $ eventSeal >> blockSeal
+
+    attachBarOutput (toOutput blockOutput, fromInput eventInput)
+    return (toOutput eventOutput, fromInput blockInput, seal)
+
+
 data Request = Command Command | StartStream StreamType
 
 data Command = SetTheme T.Text
@@ -181,6 +199,7 @@ $(deriveJSON defaultOptions ''Command)
 $(deriveJSON defaultOptions ''CommandResult)
 $(deriveJSON defaultOptions ''StreamType)
 $(deriveJSON defaultOptions ''BlockStream)
+$(deriveJSON defaultOptions ''MirrorStream)
 
 sendIpc :: Command -> MainOptions -> IO ()
 sendIpc command options@MainOptions{verbose} = do
@@ -202,6 +221,40 @@ sendIpc command options@MainOptions{verbose} = do
 sendBlockStream :: BarIO () -> MainOptions -> IO ()
 sendBlockStream loadBlocks options = runBarHost (streamClient BlockStream options) loadBlocks
 
+addServerMirrorStream :: MainOptions -> BarIO ()
+addServerMirrorStream options = do
+  (blockEventConsumer, blockOutputProducer) <- streamClient MirrorStream options
+
+  (eventOutput, eventInput) <- liftIO $ spawn unbounded
+  bar <- askBar
+
+  task <- liftIO $ async $ runEffect $ fromInput eventInput >-> blockEventConsumer
+  liftIO $ link task
+  prefix <- liftIO $ (<> "_") <$> randomIdentifier
+  addBlockCache $ newCacheIO (blockOutputProducer >-> updateBarP bar >-> attachHandlerP eventOutput prefix)
+  where
+    attachHandlerP :: Output BlockEvent -> Text -> Pipe [BlockOutput] [BlockState] IO ()
+    attachHandlerP eventOutput prefix = attachHandlerP'
+      where
+        attachHandlerP' :: Pipe [BlockOutput] [BlockState] IO ()
+        attachHandlerP' = do
+          outputs <- await
+          yield $ map (\o -> maybe (noHandler o) (attachHandler o) (_blockName o)) outputs
+          attachHandlerP'
+        noHandler :: BlockOutput -> BlockState
+        noHandler output = Just (output, Nothing)
+        attachHandler :: BlockOutput -> Text -> BlockState
+        attachHandler output blockName' = Just (output {_blockName = Just prefixedName}, Just patchedEvent)
+          where
+            patchedEvent :: BlockEventHandler
+            patchedEvent event = liftIO . atomically . void $ PC.send eventOutput $ event {name = blockName'}
+            prefixedName :: Text
+            prefixedName = prefix <> blockName'
+
+    updateBarP :: Bar -> Pipe a a IO ()
+    updateBarP bar = forever $ await >>= yield >> liftIO (updateBarDefault' bar)
+
+
 
 listenUnixSocketAsync :: MainOptions -> Bar -> CommandHandler -> IO (Async ())
 listenUnixSocketAsync options bar commandHandler = async $ listenUnixSocket options bar commandHandler
@@ -246,7 +299,6 @@ listenUnixSocket options@MainOptions{verbose} bar commandHandler = do
 
     handleRequest :: Producer ByteString IO () -> Consumer ByteString IO () -> Request -> BarIO ()
     handleRequest _leftovers responseConsumer (Command command) = liftIO $ runEffect (handleCommand command >-> responseConsumer)
-    --handleRequest leftovers responseConsumer StartBlockStream = blockStreamHandler options leftovers responseConsumer
     handleRequest leftovers responseConsumer (StartStream streamType) = mapStreamType streamType $ \s -> handleByteStream s options leftovers responseConsumer
 
     handleCommand :: Command -> Producer ByteString IO ()
diff --git a/src/QBar/Core.hs b/src/QBar/Core.hs
index b8b0f85..cad9c3f 100644
--- a/src/QBar/Core.hs
+++ b/src/QBar/Core.hs
@@ -73,7 +73,8 @@ type BarIO = SafeT (ReaderT Bar IO)
 data Bar = Bar {
   requestBarUpdate :: BlockUpdateReason -> IO (),
   newBlockChan :: TChan BlockCache,
-  barSleepScheduler :: SleepScheduler
+  barSleepScheduler :: SleepScheduler,
+  attachBarOutputInternal :: (Consumer [BlockOutput] IO (), Producer BlockEvent IO ()) -> IO ()
 }
 instance HasSleepScheduler BarIO where
   askSleepScheduler = barSleepScheduler <$> askBar
@@ -154,6 +155,20 @@ newCache input = newCacheInternal =<< newCache''
           runEffect (input >-> forever (await >>= liftIO . update))
           liftIO seal
 
+-- |Creates a new cache from a producer (over the IO monad) that automatically seals itself when the producer terminates.
+newCacheIO :: Producer [BlockState] IO () -> BlockCache
+newCacheIO input = newCacheInternal =<< newCache''
+  where
+    newCacheInternal :: (BlockCache, [BlockState] -> IO Bool, IO ()) -> BlockCache
+    newCacheInternal (cache, update, seal) = do
+      liftIO $ link =<< async updateTask
+      cache
+      where
+        updateTask :: IO ()
+        updateTask = do
+          runEffect (input >-> forever (await >>= liftIO . update))
+          liftIO seal
+
 -- |Create a new cache. The result is a tuple of the cache, a consumer that can be used to update the cache and an action that seals the cache.
 newCache' :: (MonadIO m, MonadIO m2, MonadIO m3) => m (BlockCache, Consumer [BlockState] m2 (), m3 ())
 newCache' = do
diff --git a/src/QBar/Host.hs b/src/QBar/Host.hs
index f7a04b6..ecfdc77 100644
--- a/src/QBar/Host.hs
+++ b/src/QBar/Host.hs
@@ -8,10 +8,11 @@ import QBar.Core
 import QBar.Time
 
 import Control.Concurrent (forkIO, forkFinally, threadDelay)
-import Control.Concurrent.Async (async, wait)
+import Control.Concurrent.Async (async, wait, waitBoth)
 import qualified Control.Concurrent.Event as Event
 import Control.Concurrent.MVar (MVar, newMVar, modifyMVar_, swapMVar)
-import Control.Concurrent.STM.TChan (TChan, newTChanIO, tryReadTChan)
+import Control.Concurrent.STM.TChan
+import Control.Concurrent.STM.TVar
 import Control.Exception (SomeException, catch)
 import Control.Lens hiding (each, (.=))
 import Control.Monad.STM (atomically)
@@ -19,10 +20,10 @@ import Data.IORef (IORef, newIORef, readIORef, writeIORef)
 import Data.Maybe (catMaybes, mapMaybe)
 import qualified Data.Text.Lazy as T
 import Pipes
+import Pipes.Concurrent (spawn, unbounded, toOutput, fromInput)
 import System.IO (stderr, hPutStrLn)
 import System.Posix.Signals (Handler(..), sigCONT, installHandler)
 
-
 data HostHandle = HostHandle {
   barUpdateEvent :: BarUpdateEvent,
   barUpdatedEvent :: Event.Event,
@@ -161,8 +162,18 @@ requestBarUpdateHandler HostHandle{barUpdateEvent, barUpdatedEvent, followupEven
     signalHost _ = Event.set barUpdateEvent
 
 
+attachBarOutput :: (Consumer [BlockOutput] IO (), Producer BlockEvent IO ()) -> BarIO ()
+attachBarOutput (blockOutputConsumer, blockEventProducer) = do
+  bar <- askBar
+  liftIO $ attachBarOutputInternal bar (blockOutputConsumer, blockEventProducer)
+
+
 runBarHost :: BarIO (Consumer [BlockOutput] IO (), Producer BlockEvent IO ()) -> BarIO () -> IO ()
-runBarHost createHost loadBlocks = do
+runBarHost createHost loadBlocks = runBarHost' $ loadBlocks >> createHost >>= attachBarOutput
+
+
+runBarHost' :: BarIO () -> IO ()
+runBarHost' initializeBarAction = do
   -- Create an event used request bar updates
   barUpdateEvent <- Event.newSet
   -- Create an event that is signaled after bar updates
@@ -185,20 +196,59 @@ runBarHost createHost loadBlocks = do
     eventHandlerListIORef
   }
 
+  (eventOutput, eventInput) <- spawn unbounded
+
+  -- Create cache for block outputs
+  cache <- (,) <$> newTVarIO [] <*> newBroadcastTChanIO
+  let blockOutputProducer = blockOutputFromCache cache
+
+  -- Important: both monads (output producer / event consumer) will be forked whenever a new output connects!
+  let attachBarOutputInternal = attachBarOutputImpl blockOutputProducer (toOutput eventOutput)
+
+
   let requestBarUpdate = requestBarUpdateHandler hostHandle
 
-  let bar = Bar {requestBarUpdate, newBlockChan, barSleepScheduler}
+  let bar = Bar {requestBarUpdate, newBlockChan, barSleepScheduler, attachBarOutputInternal}
 
   -- Install signal handler for SIGCONT
   installSignalHandlers bar
 
-  runBarIO bar loadBlocks
+  -- Load blocks and initialize output handlers
+  runBarIO bar initializeBarAction
+
+  -- Run blocks and send filtered output to connected clients
+  blockTask <- async $ runEffect $ runBlocks bar hostHandle >-> filterDuplicates >-> blockOutputToCache cache
+  -- Dispatch incoming events to blocks
+  eventTask <- async $ runEffect $ fromInput eventInput >-> eventDispatcher bar eventHandlerListIORef
+
+
+  void $ waitBoth blockTask eventTask
+
+  where
+    blockOutputToCache :: (TVar [BlockOutput], TChan [BlockOutput]) -> Consumer [BlockOutput] IO ()
+    blockOutputToCache (var, chan) = forever $ do
+      value <- await
+      liftIO . atomically $ do
+        writeTVar var value
+        writeTChan chan value
+
+    -- Monad will be forked when new outputs connect
+    blockOutputFromCache :: (TVar [BlockOutput], TChan [BlockOutput]) -> Producer [BlockOutput] IO ()
+    blockOutputFromCache (var, chan) = do
+      (outputChan, value) <- liftIO . atomically $ do
+        value <- readTVar var
+        outputChan <- dupTChan chan
+        return (outputChan, value)
+
+      yield value
+
+      forever $ yield =<< (liftIO . atomically $ readTChan outputChan)
 
-  (host, barEventProducer) <- runBarIO bar createHost
+    attachBarOutputImpl :: Producer [BlockOutput] IO () -> Consumer BlockEvent IO () -> (Consumer [BlockOutput] IO (), Producer BlockEvent IO ()) -> IO ()
+    attachBarOutputImpl blockOutputProducer eventConsumer (barOutputConsumer, barEventProducer) = do
 
-  let handleStdin = liftIO $ runEffect $ barEventProducer >-> eventDispatcher bar eventHandlerListIORef
-  -- Fork stdin handler
-  void $ forkFinally (runBarIO bar handleStdin) (\result -> hPutStrLn stderr $ "handleStdin failed: " <> show result)
+      let handleBarEventInput = liftIO $ runEffect $ barEventProducer >-> eventConsumer
+      liftIO $ void $ forkFinally handleBarEventInput (\result -> hPutStrLn stderr $ "An event input handler failed: " <> show result)
 
-  -- Run bar host
-  runEffect $ runBlocks bar hostHandle >-> filterDuplicates >-> host
+      let handleBarOutput = liftIO $ runEffect $ blockOutputProducer >-> filterDuplicates >-> barOutputConsumer
+      liftIO $ void $ forkFinally handleBarOutput (\result -> hPutStrLn stderr $ "A bar output handler failed: " <> show result)
diff --git a/src/QBar/Server.hs b/src/QBar/Server.hs
index 234ccb0..eb58697 100644
--- a/src/QBar/Server.hs
+++ b/src/QBar/Server.hs
@@ -28,6 +28,9 @@ import Pipes.Concurrent (Input, spawn, latest, toOutput, fromInput)
 import qualified Pipes.Prelude as PP
 import System.IO (stdin, stdout, stderr, hFlush)
 
+data ServerMode = Host | Mirror
+data ServerOutput = Sway | Headless
+
 renderIndicators :: [Text]
 renderIndicators = ["*"] <> cycle ["/", "-", "\\", "|"]
 
@@ -116,38 +119,66 @@ swayBarOutput options@MainOptions{indicator} = do
       pangoBlockName = _blockName
     }
 
+runBarServerMirror :: BarIO () -> MainOptions -> IO ()
+runBarServerMirror loadBlocks options = do
+  -- TODO: apply theme from remote
+  (blockConsumer, eventProducer, _setTheme') <- themingBarServer options
+  runBarHost (return (blockConsumer, eventProducer)) $ do
+    addServerMirrorStream options
+    loadBlocks
+
+
 runBarServer :: BarIO () -> MainOptions -> IO ()
-runBarServer loadBlocks options = runBarHost barServer loadBlocks
-  where
-    barServer :: BarIO (Consumer [BlockOutput] IO (), Producer BlockEvent IO ())
-    barServer = do
-      -- Event to render the bar (fired when block output or theme is changed)
-      renderEvent <- liftIO Event.new
+runBarServer loadBlocks options = runBarHost' $ do
+  barServer <- barServerWithSocket options
+  loadBlocks
+  attachBarOutput barServer
 
-      -- Mailbox to store the latest 'BlockOutput's
-      (output, input) <- liftIO $ spawn $ latest []
 
-      -- MVar that holds the current theme, linked to the input from the above mailbox
-      (themedBlockProducerMVar :: MVar (Producer [ThemedBlockOutput] IO (), Bool)) <- liftIO $ newMVar $ throw $ userError "Unexpected behavior: Default theme not set"
+barServerWithSocket :: MainOptions -> BarIO (Consumer [BlockOutput] IO (), Producer BlockEvent IO ())
+barServerWithSocket options = do
+  (blockConsumer, eventProducer, setTheme') <- themingBarServer options
 
-      let setTheme' = setTheme renderEvent input themedBlockProducerMVar
+  bar <- askBar
 
-      -- Set default theme
-      liftIO $ setTheme' defaultTheme
+  -- Create control socket
+  controlSocketAsync <- liftIO $ listenUnixSocketAsync options bar (commandHandler setTheme')
+  liftIO $ link controlSocketAsync
+
+  return (blockConsumer, eventProducer)
+  where
+    commandHandler :: (Theme -> IO ()) -> Command -> IO CommandResult
+    commandHandler setTheme' (SetTheme name) =
+      case findTheme name of
+        Left err -> return $ Error err
+        Right theme -> do
+          setTheme' theme
+          return Success
 
-      bar <- askBar
 
-      -- Create control socket
-      controlSocketAsync <- liftIO $ listenUnixSocketAsync options bar (commandHandler setTheme')
-      liftIO $ link controlSocketAsync
+themingBarServer :: MonadIO m => MainOptions -> m (Consumer [BlockOutput] IO (), Producer BlockEvent IO (), Theme -> IO ())
+themingBarServer options = do
+  -- Event to render the bar (fired when block output or theme is changed)
+  renderEvent <- liftIO Event.new
 
+  -- Mailbox to store the latest 'BlockOutput's
+  (output, input) <- liftIO $ spawn $ latest []
 
-      -- Run render loop
-      liftIO $ link =<< async (renderLoop renderEvent themedBlockProducerMVar)
+  -- MVar that holds the current theme, linked to the input from the above mailbox
+  (themedBlockProducerMVar :: MVar (Producer [ThemedBlockOutput] IO (), Bool)) <- liftIO $ newMVar $ throw $ userError "Unexpected behavior: Default theme not set"
 
-      -- Return a consumer that accepts BlockOutputs from the bar host, moves them to the mailbox and signals the renderer to update the bar.
-      return (signalEventPipe renderEvent >-> toOutput output, swayBarInput options)
+  let setTheme' = setTheme renderEvent input themedBlockProducerMVar
 
+  -- Set default theme
+  liftIO $ setTheme' defaultTheme
+
+  -- Run render loop
+  liftIO $ link =<< async (renderLoop renderEvent themedBlockProducerMVar)
+
+  -- Return a consumer that accepts BlockOutputs from the bar host, moves them to the mailbox and signals the renderer to update the bar.
+  return (signalEventPipe renderEvent >-> toOutput output, swayBarInput options, setTheme')
+
+  where
     renderLoop :: Event.Event -> MVar (Producer [ThemedBlockOutput] IO (), Bool) -> IO ()
     renderLoop renderEvent themedBlockProducerMVar = runEffect $
       themeAnimator renderEvent themedBlockProducerMVar >-> filterDuplicates >-> swayBarOutput options
@@ -168,7 +199,6 @@ runBarServer loadBlocks options = runBarHost barServer loadBlocks
           yield themedBlocks
           liftIO $ if isAnimated''
             -- Limit to 10 FPS because swaybar rendering is surprisingly expensive
-            -- TODO: make FPS configurable
             then void $ Event.waitTimeout renderEvent 100000
             else Event.wait renderEvent
           themeAnimator'
@@ -181,11 +211,3 @@ runBarServer loadBlocks options = runBarHost barServer loadBlocks
         mkThemedBlockProducer :: Theme -> (Producer [ThemedBlockOutput] IO (), Bool)
         mkThemedBlockProducer (StaticTheme themeFn) = (fromInput blockOutputInput >-> PP.map themeFn, False)
         mkThemedBlockProducer (AnimatedTheme themePipe) = (fromInput blockOutputInput >-> themePipe, True)
-
-    commandHandler :: (Theme -> IO ()) -> Command -> IO CommandResult
-    commandHandler setTheme' (SetTheme name) =
-      case findTheme name of
-        Left err -> return $ Error err
-        Right theme -> do
-          setTheme' theme
-          return Success
-- 
GitLab