From 109cbc0b97f8f0f1d2c3d3ea8d18507bee4e3b6d Mon Sep 17 00:00:00 2001 From: Jens Nolte <jens@nightmarestudio.de> Date: Tue, 18 Feb 2020 04:01:38 +0100 Subject: [PATCH] Implement remote-display of blocks in another qbar server instance --- package.yaml | 2 + src/QBar/Cli.hs | 3 +- src/QBar/ControlSocket.hs | 189 +++++++++++++++++++++++++++++++------- src/QBar/Core.hs | 66 +++++++++++-- src/QBar/Host.hs | 14 ++- src/QBar/Server.hs | 16 ++-- src/QBar/Util.hs | 13 +++ 7 files changed, 245 insertions(+), 58 deletions(-) diff --git a/package.yaml b/package.yaml index a6911b8..2183ddf 100644 --- a/package.yaml +++ b/package.yaml @@ -39,6 +39,7 @@ dependencies: - pipes-network - pipes-parse - pipes-safe +- random - stm - text - time @@ -49,6 +50,7 @@ dependencies: default-extensions: - OverloadedStrings - NamedFieldPuns +- LambdaCase ghc-options: - -fwarn-unused-do-bind diff --git a/src/QBar/Cli.hs b/src/QBar/Cli.hs index c9f421d..986afc3 100644 --- a/src/QBar/Cli.hs +++ b/src/QBar/Cli.hs @@ -8,11 +8,12 @@ import qualified Data.Text as T import qualified Data.Text.Lazy as TL import Options.Applicative -data BarCommand = BarServerCommand | SetThemeCommand Text +data BarCommand = BarServerCommand | SetThemeCommand Text | ConnectSocket barCommandParser :: Parser BarCommand barCommandParser = hsubparser ( command "server" (info (pure BarServerCommand) (progDesc "Start a new qbar server. Should be called by swaybar, i3bar or or another i3bar-protocol compatible host process.")) <> + command "connect" (info (pure ConnectSocket) (progDesc "Run blocks on this process but display them on the qbar server.")) <> command "theme" (info themeCommandParser (progDesc "Change the theme of the running qbar server.")) <> command "default" (info (pure $ SetThemeCommand "default") (progDesc "Shortcut for 'qbar theme default'.")) <> command "rainbow" (info (pure $ SetThemeCommand "rainbow") (progDesc "Shortcut for 'qbar theme rainbow'.")) diff --git a/src/QBar/ControlSocket.hs b/src/QBar/ControlSocket.hs index 81b3b1d..8faa26a 100644 --- a/src/QBar/ControlSocket.hs +++ b/src/QBar/ControlSocket.hs @@ -1,18 +1,27 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE Rank2Types #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE FlexibleContexts #-} module QBar.ControlSocket where +import QBar.BlockOutput import QBar.Cli (MainOptions(..)) import QBar.Core -import QBar.BlockOutput +import QBar.Host +import QBar.Util -import Control.Exception (handle) +import Control.Exception (SomeException, handle) import Control.Monad (forever, void, when) import Control.Concurrent (forkFinally) import Control.Concurrent.Async +import Data.Aeson (FromJSON, ToJSON) import Data.Aeson.TH import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as BSC import System.FilePath ((</>)) import System.IO import Data.Either (either) @@ -22,7 +31,9 @@ import qualified Data.Text as T import qualified Data.Text.Lazy as TL import Network.Socket import Pipes +import Pipes.Concurrent as PC (Output, spawn', unbounded, fromInput, send, atomically) import Pipes.Parse +import qualified Pipes.Prelude as PP import Pipes.Aeson (decode, DecodingError) import Pipes.Aeson.Unchecked (encode) import Pipes.Network.TCP (fromSocket, toSocket) @@ -31,8 +42,108 @@ import System.Environment (getEnv) type CommandHandler = Command -> IO CommandResult -data Request = Command Command | ConnectBarHost - deriving Show + +class (ToJSON (Up s), FromJSON (Up s), ToJSON (Down s), FromJSON (Down s)) => IsStream s where + type Up s + type Down s + streamHandler :: s -> BarIO (Consumer (Up s) IO (), Producer (Down s) IO (), IO ()) + toStreamType :: s -> StreamType + + streamClient :: (MonadIO m) => s -> MainOptions -> m (Consumer (Up s) IO (), Producer (Down s) IO ()) + streamClient s options@MainOptions{verbose} = do + sock <- liftIO $ connectIpcSocket options + runEffect (encode (StartStream $ toStreamType s) >-> toSocket sock) + let up = forever (await >>= encode) >-> verbosePrintP >-> toSocket sock + let down = decodeStreamSafe options (fromSocket sock 4096) + return (up, down) + where + verbosePrintP :: Pipe ByteString ByteString IO () + verbosePrintP = if verbose then (PP.chain $ BSC.hPutStrLn stderr) else cat + handleByteStream :: s -> MainOptions -> Producer ByteString IO () -> Consumer ByteString IO () -> BarIO () + handleByteStream s options up down = do + (handleUp, handleDown, cleanup) <- streamHandler s + readTask <- liftIO $ async $ runEffect $ + decodeStreamSafe options up >-> handleUp + writeTask <- liftIO $ async $ runEffect $ + handleDown >-> forever (await >>= encode) >-> down + liftIO $ do + void $ waitEitherCancel readTask writeTask + cleanup + + +decodeStreamSafe :: FromJSON v => MainOptions -> Producer ByteString IO () -> Producer v IO () +decodeStreamSafe MainOptions{verbose} inputStream = decodeStream inputStream >-> failOnEmptyStream >-> failOnDecodingError + where + decodeStream :: FromJSON v => Producer ByteString IO () -> Producer (Maybe (Either DecodingError v)) IO () + decodeStream inputStream' = do + (maybeDecodeResult, leftovers) <- liftIO $ runStateT decode inputStream' + yield maybeDecodeResult + decodeStream leftovers + + failOnEmptyStream :: Pipe (Maybe a) a IO () + failOnEmptyStream = failOnEmptyStream' + where + failOnEmptyStream' = do + m <- await + case m of + Nothing -> liftIO $ when verbose $ hPutStrLn stderr "Ipc connection stream failed: Stream ended" + Just v -> yield v >> failOnEmptyStream' + + failOnDecodingError :: Pipe (Either DecodingError a) a IO () + failOnDecodingError = failOnDecodingError' + where + failOnDecodingError' = do + m <- await + case m of + Left err -> liftIO $ when verbose $ hPutStrLn stderr $ "Ipc connection stream decoding failed: " <> show err + Right v -> yield v >> failOnDecodingError' + + +data StreamType = BlockStreamType BlockStream +mapStreamType :: StreamType -> (forall a. IsStream a => a -> b) -> b +mapStreamType (BlockStreamType a) f = f a + + +data BlockStream = BlockStream +instance IsStream BlockStream where + type Up BlockStream = [BlockOutput] + type Down BlockStream = BlockEvent + toStreamType = BlockStreamType + streamHandler _ = do + (cache, updateC, seal) <- newCache' + (eventOutput, eventInput, eventSeal) <- liftIO $ spawn' unbounded + bar <- askBar + addBlock cache + prefix <- liftIO $ (<> "_") <$> randomIdentifier + return (updateBarP bar >-> attachHandlerP eventOutput prefix >-> updateC, fromInput eventInput, seal >> atomically eventSeal) + where + attachHandlerP :: Output BlockEvent -> Text -> Pipe [BlockOutput] [BlockState] IO () + attachHandlerP eventOutput prefix = attachHandlerP' + where + attachHandlerP' :: Pipe [BlockOutput] [BlockState] IO () + attachHandlerP' = do + outputs <- await + yield $ map (\o -> maybe (noHandler o) (attachHandler o) (_blockName o)) outputs + attachHandlerP' + noHandler :: BlockOutput -> BlockState + noHandler output = Just (output, Nothing) + attachHandler :: BlockOutput -> Text -> BlockState + attachHandler output blockName' = Just (output {_blockName = Just prefixedName}, Just patchedEvent) + where + patchedEvent :: BlockEventHandler + patchedEvent event = liftIO . atomically . void $ PC.send eventOutput $ event {name = blockName'} + prefixedName :: Text + prefixedName = prefix <> blockName' + + updateBarP :: Bar -> Pipe a a IO () + updateBarP bar = do + v <- await + yield v + liftIO $ updateBar' bar + updateBarP bar + + +data Request = Command Command | StartStream StreamType data Command = SetTheme TL.Text deriving Show @@ -40,10 +151,6 @@ data Command = SetTheme TL.Text data CommandResult = Success | Error Text deriving Show -$(deriveJSON defaultOptions ''Request) -$(deriveJSON defaultOptions ''Command) -$(deriveJSON defaultOptions ''CommandResult) - ipcSocketAddress :: MainOptions -> IO FilePath ipcSocketAddress MainOptions{socketLocation} = maybe defaultSocketPath (return . T.unpack) socketLocation @@ -69,12 +176,23 @@ ipcSocketAddress MainOptions{socketLocation} = maybe defaultSocketPath (return . handleEnvError :: IO FilePath -> IO (Maybe FilePath) handleEnvError = handle (const $ return Nothing :: IOError -> IO (Maybe FilePath)) . fmap Just -sendIpc :: MainOptions -> Command -> IO () -sendIpc options@MainOptions{verbose} command = do - let request = Command command +connectIpcSocket :: MainOptions -> IO Socket +connectIpcSocket options = do socketPath <- ipcSocketAddress options sock <- socket AF_UNIX Stream defaultProtocol connect sock $ SockAddrUnix socketPath + return sock + +$(deriveJSON defaultOptions ''Request) +$(deriveJSON defaultOptions ''Command) +$(deriveJSON defaultOptions ''CommandResult) +$(deriveJSON defaultOptions ''StreamType) +$(deriveJSON defaultOptions ''BlockStream) + +sendIpc :: MainOptions -> Command -> IO () +sendIpc options@MainOptions{verbose} command = do + let request = Command command + sock <- connectIpcSocket options runEffect $ encode request >-> toSocket sock decodeResult <- evalStateT decode $ fromSocket sock 4096 @@ -88,11 +206,15 @@ sendIpc options@MainOptions{verbose} command = do showResponse Success = when verbose $ hPutStrLn stderr "Success" showResponse (Error message) = hPrint stderr message -listenUnixSocketAsync :: MainOptions -> CommandHandler -> IO (Async ()) -listenUnixSocketAsync options commandHandler = async $ listenUnixSocket options commandHandler +sendBlockStream :: MainOptions -> BarIO () -> IO () +sendBlockStream = runBarHost . streamClient BlockStream -listenUnixSocket :: MainOptions -> CommandHandler -> IO () -listenUnixSocket options commandHandler = do + +listenUnixSocketAsync :: MainOptions -> Bar -> CommandHandler -> IO (Async ()) +listenUnixSocketAsync options bar commandHandler = async $ listenUnixSocket options bar commandHandler + +listenUnixSocket :: MainOptions -> Bar -> CommandHandler -> IO () +listenUnixSocket options@MainOptions{verbose} bar commandHandler = do socketPath <- ipcSocketAddress options hPutStrLn stderr $ "Creating control socket at " <> socketPath socketExists <- doesFileExist socketPath @@ -103,27 +225,36 @@ listenUnixSocket options commandHandler = do listen sock 5 forever $ do (conn, _) <- accept sock - void $ forkFinally (socketHandler conn) (\_ -> close conn) + void $ forkFinally (socketHandler conn) (handleSocketResult conn) where + handleSocketResult :: Socket -> Either SomeException () -> IO () + handleSocketResult conn (Left err) = do + when verbose $ hPutStrLn stderr $ "Ipc connection closed with error " <> show err + close conn + handleSocketResult conn (Right ()) = do + when verbose $ hPutStrLn stderr "Ipc connection closed" + close conn socketHandler :: Socket -> IO () - socketHandler sock = streamHandler (fromSocket sock 4096) (toSocket sock) - streamHandler :: Producer ByteString IO () -> Consumer ByteString IO () -> IO () - streamHandler producer responseConsumer = do + socketHandler conn = do + when verbose $ hPutStrLn stderr "Ipc connection created" + socketHandler' (fromSocket conn 4096) (toSocket conn) + socketHandler' :: Producer ByteString IO () -> Consumer ByteString IO () -> IO () + socketHandler' producer responseConsumer = do (maybeDecodeResult, leftovers) <- runStateT decode producer -- Handle empty result case maybeDecodeResult of Nothing -> reply $ errorResponse "Empty stream" Just decodeResult -> case decodeResult of Left err -> reply $ handleError err - Right request -> handleRequest leftovers responseConsumer request + Right request -> runBarIO bar $ handleRequest leftovers responseConsumer request where reply :: Producer ByteString IO () -> IO () reply response = runEffect (response >-> responseConsumer) - handleRequest :: Producer ByteString IO () -> Consumer ByteString IO () -> Request -> IO () - handleRequest _leftovers responseConsumer (Command command) = runEffect (handleCommand command >-> responseConsumer) - --handleRequest leftovers Block = addBlock $ handleBlockStream leftovers - handleRequest _leftovers _responseConsumer ConnectBarHost = error "TODO" + handleRequest :: Producer ByteString IO () -> Consumer ByteString IO () -> Request -> BarIO () + handleRequest _leftovers responseConsumer (Command command) = liftIO $ runEffect (handleCommand command >-> responseConsumer) + --handleRequest leftovers responseConsumer StartBlockStream = blockStreamHandler options leftovers responseConsumer + handleRequest leftovers responseConsumer (StartStream streamType) = mapStreamType streamType $ \s -> handleByteStream s options leftovers responseConsumer handleCommand :: Command -> Producer ByteString IO () handleCommand command = do @@ -133,13 +264,3 @@ listenUnixSocket options commandHandler = do handleError = encode . Error . pack . show errorResponse :: Text -> Producer ByteString IO () errorResponse message = encode $ Error message - -handleBlockStream :: Producer ByteString IO () -> PushBlock -handleBlockStream producer = do - (decodeResult, leftovers) <- liftIO $ runStateT decode producer - maybe exitBlock (either (const exitBlock) (handleParsedBlock leftovers)) decodeResult - where - handleParsedBlock :: Producer ByteString IO () -> String -> PushBlock - handleParsedBlock leftovers update = do - updateBlock $ mkBlockOutput . normalText $ TL.pack update - handleBlockStream leftovers diff --git a/src/QBar/Core.hs b/src/QBar/Core.hs index 4606b2b..b25a1e6 100644 --- a/src/QBar/Core.hs +++ b/src/QBar/Core.hs @@ -12,7 +12,7 @@ import Control.Concurrent.MVar import Control.Concurrent.STM.TChan (TChan, writeTChan) import Control.Exception (IOException) import Control.Lens -import Control.Monad (forever) +import Control.Monad (forever, when) import Control.Monad.Reader (ReaderT, runReaderT, ask) import Control.Monad.State (StateT) import Control.Monad.Writer (WriterT) @@ -137,6 +137,62 @@ updateEventHandler eventHandler (Just (blockOutput, _)) = Just (blockOutput, Jus runBarIO :: Bar -> BarIO r -> IO r runBarIO bar action = runReaderT (runSafeT action) bar + +newCache :: Producer [BlockState] IO () -> BlockCache +newCache input = newCacheInternal =<< newCache'' + where + newCacheInternal :: (BlockCache, [BlockState] -> IO Bool, IO ()) -> BlockCache + newCacheInternal (cache, update, seal) = do + liftIO $ link =<< async updateTask + cache + where + updateTask :: IO () + updateTask = do + runEffect (input >-> forever (await >>= liftIO . update)) + seal + +newCache' :: (MonadIO m) => m (BlockCache, Consumer [BlockState] IO (), IO ()) +newCache' = do + (cache, update, seal) <- newCache'' + return (cache, cacheUpdateConsumer update, seal) + where + cacheUpdateConsumer :: ([BlockState] -> IO Bool) -> Consumer [BlockState] IO () + cacheUpdateConsumer update = do + v <- await + result <- liftIO $ update v + when result $ cacheUpdateConsumer update + +newCache'' :: (MonadIO m) => m (BlockCache, [BlockState] -> IO Bool, IO ()) +newCache'' = do + store <- liftIO $ newMVar (Just []) + newCacheInternal store + where + newCacheInternal :: MonadIO m => MVar (Maybe [BlockState]) -> m (BlockCache, [BlockState] -> IO Bool, IO ()) + newCacheInternal store = return (cache, update, seal) + where + update :: [BlockState] -> IO Bool + update value = modifyMVar store $ \old -> + return $ case old of + Nothing -> (Nothing, False) + Just _ -> (Just value, True) + seal :: IO () + seal = void . swapMVar store $ Nothing + cache :: BlockCache + cache = do + v <- liftIO (readMVar store) + case v of + Nothing -> exitCache + Just value -> yield value >> cache + + +cacheFromInput :: Input BlockState -> BlockCache +cacheFromInput input = do + result <- liftIO $ atomically $ recv input + case result of + Nothing -> exitCache + Just value -> yield [value] >> cacheFromInput input + + modify :: (BlockOutput -> BlockOutput) -> Pipe BlockState BlockState BarIO r modify x = PP.map (over (_Just . _1) x) @@ -162,13 +218,6 @@ autoPadding = autoPadding' 0 0 padShortText :: Int64 -> BlockOutput -> BlockOutput padShortText len = over (shortText._Just) $ \s -> padString (len - printedLength s) <> s -cacheFromInput :: Input BlockState -> BlockCache -cacheFromInput input = do - result <- liftIO $ atomically $ recv input - case result of - Nothing -> exitCache - Just value -> yield [value] >> cacheFromInput input - -- | Create a shared interval. Takes a BarUpdateChannel to signal bar updates and an interval (in seconds).Data.Maybe -- Returns an IO action that can be used to attach blocks to the shared interval and an async that contains a reference to the scheduler thread. @@ -330,6 +379,7 @@ cachePushBlock pushBlock = lift (next pushBlock) >>= either (const exitCache) wi -- Then clear the block and seal the mailbox liftIO $ atomically $ void $ send output Nothing updateBar + -- TODO: sealing does prevent a 'latest' mailbox from being read liftIO $ atomically seal sendOutputToMailbox :: Output BlockState -> BlockState -> Effect BarIO () sendOutputToMailbox output blockOutput = do diff --git a/src/QBar/Host.hs b/src/QBar/Host.hs index c465bf7..2ba09a0 100644 --- a/src/QBar/Host.hs +++ b/src/QBar/Host.hs @@ -63,9 +63,9 @@ runBlocks bar HostHandle{barUpdateEvent, newBlockChan, eventHandlerListIORef} = threadDelay 10000 Event.clear barUpdateEvent - blocks' <- liftIO $ runBarIO bar $ addNewBlocks blocks + blocks' <- lift $ runBarIO bar $ addNewBlocks blocks - (blockStates, blocks'') <- liftIO $ runBarIO bar $ getBlockStates blocks' + (blockStates, blocks'') <- lift $ runBarIO bar $ getBlockStates blocks' -- Pass blocks to output yield $ map fst $ catMaybes blockStates @@ -126,11 +126,8 @@ filterDuplicates = do filterDuplicates' value -runBarHost :: Consumer [BlockOutput] IO () - -> Producer BlockEvent IO () - -> BarIO () - -> IO () -runBarHost host barEventProducer loadBlocks = do +runBarHost :: BarIO (Consumer [BlockOutput] IO (), Producer BlockEvent IO ()) -> BarIO () -> IO () +runBarHost createHost loadBlocks = do -- Create an event used to signal bar updates barUpdateEvent <- Event.newSet let requestBarUpdate = Event.set barUpdateEvent @@ -154,10 +151,11 @@ runBarHost host barEventProducer loadBlocks = do runBarIO bar loadBlocks + (host, barEventProducer) <- runBarIO bar createHost + let handleStdin = liftIO $ runEffect $ barEventProducer >-> eventDispatcher bar eventHandlerListIORef -- Fork stdin handler void $ forkFinally (runBarIO bar handleStdin) (\result -> hPutStrLn stderr $ "handleStdin failed: " <> show result) -- Run bar host runEffect $ runBlocks bar hostHandle >-> filterDuplicates >-> host - diff --git a/src/QBar/Server.hs b/src/QBar/Server.hs index 5d016ce..899d3ca 100644 --- a/src/QBar/Server.hs +++ b/src/QBar/Server.hs @@ -117,10 +117,10 @@ swayBarOutput options@MainOptions{indicator} = do pangoBlockName = _blockName } -runBarServer :: BarIO () -> MainOptions -> IO () -runBarServer defaultBarConfig options = runBarHost barServer (swayBarInput options) defaultBarConfig +runBarServer :: MainOptions -> BarIO () -> IO () +runBarServer options = runBarHost barServer where - barServer :: Consumer [BlockOutput] IO () + barServer :: BarIO (Consumer [BlockOutput] IO (), Producer BlockEvent IO ()) barServer = do -- Event to render the bar (fired when block output or theme is changed) renderEvent <- liftIO Event.new @@ -136,8 +136,10 @@ runBarServer defaultBarConfig options = runBarHost barServer (swayBarInput optio -- Set default theme liftIO $ setTheme' defaultTheme + bar <- askBar + -- Create control socket - controlSocketAsync <- liftIO $ listenUnixSocketAsync options (commandHandler setTheme') + controlSocketAsync <- liftIO $ listenUnixSocketAsync options bar (commandHandler setTheme') liftIO $ link controlSocketAsync @@ -145,7 +147,7 @@ runBarServer defaultBarConfig options = runBarHost barServer (swayBarInput optio liftIO $ link =<< async (renderLoop renderEvent themedBlockProducerMVar) -- Return a consumer that accepts BlockOutputs from the bar host, moves them to the mailbox and signals the renderer to update the bar. - signalPipe renderEvent >-> toOutput output + return (signalPipe renderEvent >-> toOutput output, swayBarInput options) renderLoop :: Event.Event -> MVar (Producer [ThemedBlockOutput] IO (), Bool) -> IO () renderLoop renderEvent themedBlockProducerMVar = runEffect $ @@ -190,10 +192,10 @@ runBarServer defaultBarConfig options = runBarHost barServer (swayBarInput optio return Success - -- |Entry point. runQBar :: BarIO () -> MainOptions -> IO () runQBar barConfiguration options@MainOptions{barCommand} = runCommand barCommand where - runCommand BarServerCommand = runBarServer barConfiguration options + runCommand BarServerCommand = runBarServer options barConfiguration + runCommand ConnectSocket = sendBlockStream options barConfiguration runCommand (SetThemeCommand themeName) = sendIpc options $ SetTheme themeName diff --git a/src/QBar/Util.hs b/src/QBar/Util.hs index 72d211f..8ad7546 100644 --- a/src/QBar/Util.hs +++ b/src/QBar/Util.hs @@ -1,7 +1,10 @@ module QBar.Util where import Control.Concurrent.Event as Event +import Control.Monad (replicateM) +import qualified Data.Text.Lazy as T import Pipes +import System.Random -- Pipe that signals an 'Event' after every value that passes through signalPipe :: MonadIO m => Event.Event -> Pipe a a m r @@ -13,3 +16,13 @@ signalPipe event = signalPipe' yield value liftIO $ Event.signal event signalPipe' + +randomIdentifier :: IO Text +randomIdentifier = T.pack <$> replicateM 8 randomCharacter + where + randomCharacter :: IO Char + randomCharacter = do + index <- randomRIO (0, T.length alphabet - 1) + return $ T.index alphabet index + alphabet :: T.Text + alphabet = T.pack $ ['A'..'Z'] ++ ['a'..'z'] ++ ['0'..'9'] -- GitLab