Skip to content
Snippets Groups Projects
Commit 7f4d66c8 authored by Jens Nolte's avatar Jens Nolte
Browse files

Rewrite cachePushBlock

parent 2eb74913
No related branches found
No related tags found
No related merge requests found
......@@ -161,6 +161,7 @@ runBarIO bar action = liftIO $ runReaderT (runSafeT action) bar
defaultInterval :: Interval
defaultInterval = everyNSeconds 10
-- |Converts a 'PullBlock' to a 'PushBlock' by running it whenever the 'defaultInterval' is triggered.
schedulePullBlock :: PullBlock -> PushBlock
schedulePullBlock pullBlock = PushMode <$ pullBlock >-> sleepToNextInterval
where
......@@ -189,45 +190,50 @@ schedulePullBlock pullBlock = PushMode <$ pullBlock >-> sleepToNextInterval
triggerOnClick :: Event -> BlockEvent -> BarIO ()
triggerOnClick event _ = liftIO $ Event.signal event
newCache :: Producer [BlockState] IO () -> BlockCache
-- |Creates a new cache from a producer that automatically seals itself when the producer terminates.
newCache :: Producer [BlockState] BarIO () -> BlockCache
newCache input = newCacheInternal =<< newCache''
where
newCacheInternal :: (BlockCache, [BlockState] -> IO Bool, IO ()) -> BlockCache
newCacheInternal (cache, update, seal) = do
liftIO $ link =<< async updateTask
task <- barAsync updateTask
liftIO $ link task
cache
where
updateTask :: IO ()
updateTask :: BarIO ()
updateTask = do
runEffect (input >-> forever (await >>= liftIO . update))
seal
liftIO seal
newCache' :: (MonadIO m) => m (BlockCache, Consumer [BlockState] IO (), IO ())
-- |Create a new cache. The result is a tuple of the cache, a consumer that can be used to update the cache and an action that seals the cache.
newCache' :: (MonadIO m, MonadIO m2, MonadIO m3) => m (BlockCache, Consumer [BlockState] m2 (), m3 ())
newCache' = do
(cache, update, seal) <- newCache''
return (cache, cacheUpdateConsumer update, seal)
where
cacheUpdateConsumer :: ([BlockState] -> IO Bool) -> Consumer [BlockState] IO ()
cacheUpdateConsumer :: MonadIO m2 => ([BlockState] -> IO Bool) -> Consumer [BlockState] m2 ()
cacheUpdateConsumer update = do
v <- await
result <- liftIO $ update v
when result $ cacheUpdateConsumer update
newCache'' :: (MonadIO m) => m (BlockCache, [BlockState] -> IO Bool, IO ())
-- |Low-level function to create a new cache. The result is a tuple of the cache, an action can be used to update the cache (it returns 'False'
-- |if the cache is sealed) and an action that seals the cache.
newCache'' :: (MonadIO m, MonadIO m2, MonadIO m3) => m (BlockCache, [BlockState] -> m2 Bool, m3 ())
newCache'' = do
store <- liftIO $ newMVar (Just [])
newCacheInternal store
where
newCacheInternal :: MonadIO m => MVar (Maybe [BlockState]) -> m (BlockCache, [BlockState] -> IO Bool, IO ())
newCacheInternal :: (MonadIO m, MonadIO m2, MonadIO m3) => MVar (Maybe [BlockState]) -> m (BlockCache, [BlockState] -> m2 Bool, m3 ())
newCacheInternal store = return (cache, update, seal)
where
update :: [BlockState] -> IO Bool
update value = modifyMVar store $ \old ->
update :: MonadIO m => [BlockState] -> m Bool
update value = liftIO $ modifyMVar store $ \old ->
return $ case old of
Nothing -> (Nothing, False)
Just _ -> (Just value, True)
seal :: IO ()
seal = void . swapMVar store $ Nothing
seal :: MonadIO m => m ()
seal = liftIO . void . swapMVar store $ Nothing
cache :: BlockCache
cache = do
v <- liftIO (readMVar store)
......@@ -235,13 +241,15 @@ newCache'' = do
Nothing -> exitCache
Just value -> yield value >> cache
cacheFromInput :: Input BlockState -> BlockCache
cacheFromInput input = do
result <- liftIO $ atomically $ recv input
case result of
Nothing -> exitCache
Just value -> yield [value] >> cacheFromInput input
-- |Creates a cache from a push block.
cachePushBlock :: PushBlock -> BlockCache
cachePushBlock pushBlock = newCache $ () <$ (pushBlock >-> updateBarP)
where
updateBarP :: Pipe BlockUpdate [BlockState] BarIO PushMode
updateBarP = forever $ do
(state, reason) <- await
yield [state]
updateBar reason
modify :: (BlockOutput -> BlockOutput) -> Pipe BlockUpdate BlockUpdate BarIO r
......@@ -287,37 +295,7 @@ updateBarDefault = updateBar DefaultUpdate
updateBarDefault' :: MonadIO m => Bar -> m ()
updateBarDefault' bar = updateBar' bar DefaultUpdate
barAsync :: BarIO a -> BarIO (Async a)
barAsync :: MonadBarIO m => BarIO a -> m (Async a)
barAsync action = do
bar <- askBar
liftIO $ async $ runBarIO bar action
cachePushBlock :: PushBlock -> BlockCache
cachePushBlock pushBlock = lift (next pushBlock) >>= either (const exitCache) withInitialBlock
where
withInitialBlock :: (BlockUpdate, PushBlock) -> BlockCache
withInitialBlock (initialBlockUpdate, pushBlock') = do
let (initialBlockState, _) = initialBlockUpdate
(output, input, seal) <- liftIO $ spawn' $ latest initialBlockState
-- The async could be used to stop the block later, but for now we are just linking it to catch exceptions
task <- lift $ barAsync (sendProducerToMailbox output seal pushBlock')
liftIO $ link task
cacheFromInput input
sendProducerToMailbox :: Output BlockState -> STM () -> PushBlock -> BarIO ()
sendProducerToMailbox output seal pushBlock' = do
-- Send push block output to mailbox until it terminates
void $ runEffect $ for pushBlock' (sendOutputToMailbox output)
-- Then clear the block and seal the mailbox
liftIO $ atomically $ void $ send output Nothing
-- TODO: pass reason from BlockUpdate
updateBarDefault
-- TODO: sealing does prevent a 'latest' mailbox from being read
liftIO $ atomically seal
sendOutputToMailbox :: Output BlockState -> BlockUpdate -> Effect BarIO ()
sendOutputToMailbox output blockUpdate = do
let (state, _reason) = blockUpdate
-- The void is discarding the boolean result that indicates if the mailbox is sealed
-- This is ok because a cached block is never sealed from the receiving side
liftIO $ atomically $ void $ send output state
-- TODO signal update reason to renderer
lift updateBarDefault
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment