Skip to content
Snippets Groups Projects
Commit fe7519ce authored by Jens Nolte's avatar Jens Nolte
Browse files

Fix spawning and destruction of push block producers

parent 9d8f7622
No related branches found
No related tags found
No related merge requests found
......@@ -310,20 +310,29 @@ cachePushBlock barUpdateChannel (PushBlockProducer blockProducer) = CachedBlockP
where
withInitialBlock :: (BlockOutput, BlockProducer) -> BlockProducer
withInitialBlock (initialBlockOutput, blockProducer') = do
(output, input, seal) <- lift $ spawn' $ latest initialBlockOutput
(output, input, seal) <- lift $ spawn' $ latest $ Just initialBlockOutput
-- The async could be used to stop the block later, but for now we are just linking it to catch exceptions
lift $ link =<< async (sendProducerToMailbox output seal blockProducer')
fromInput input
sendProducerToMailbox :: Output BlockOutput -> STM () -> BlockProducer -> IO ()
terminateOnMaybe $ fromInput input
sendProducerToMailbox :: Output (Maybe BlockOutput) -> STM () -> BlockProducer -> IO ()
sendProducerToMailbox output seal blockProducer' = do
runEffect $ for blockProducer' (sendOutputToMailbox output)
atomically $ void $ send output Nothing
updateBar barUpdateChannel
atomically seal
sendOutputToMailbox :: Output BlockOutput -> BlockOutput -> Effect IO ()
sendOutputToMailbox :: Output (Maybe BlockOutput) -> BlockOutput -> Effect IO ()
sendOutputToMailbox output blockOutput = lift $ do
-- The void is discarding the boolean result that indicates if the mailbox is sealed
-- This is ok because a cached block is never sealed from the receiving side
atomically $ void $ send output blockOutput
atomically $ void $ send output $ Just blockOutput
updateBar barUpdateChannel
terminateOnMaybe :: Producer (Maybe a) IO () -> Producer a IO ()
terminateOnMaybe p = do
eitherMaybeValue <- lift $ next p
case eitherMaybeValue of
Right (Just value, newP) -> yield value >> terminateOnMaybe newP
_ -> return ()
blockToCachedBlockProducer :: BarUpdateChannel -> Block -> CachedBlockProducer
blockToCachedBlockProducer barUpdateChannel (PushBlock pushBlockProducer) = cachePushBlock barUpdateChannel pushBlockProducer
......
......@@ -59,8 +59,6 @@ renderLoop options handle@Handle{handleActiveFilter} barUpdateChannel barUpdateE
addNewBlockProducers (newCachedBlockProducer:blockProducers)
renderLoop' :: BS.ByteString -> [CachedBlockProducer] -> IO ()
renderLoop' previousBarOutput' blockProducers = do
blockProducers' <- addNewBlockProducers blockProducers
blockFilter <- readIORef handleActiveFilter
-- Wait for an event (unless the filter is animated)
......@@ -70,6 +68,8 @@ renderLoop options handle@Handle{handleActiveFilter} barUpdateChannel barUpdateE
threadDelay 10000
Event.clear barUpdateEvent
blockProducers' <- addNewBlockProducers blockProducers
(blocks, blockProducers'') <- runBlocks blockProducers'
currentBarOutput <- renderLine options handle blockFilter blocks previousBarOutput'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment