Skip to content
Snippets Groups Projects
Commit f283b693 authored by Legy (Beini)'s avatar Legy (Beini)
Browse files

Implement the Subscribable

parent fe554cfc
No related branches found
No related tags found
No related merge requests found
Pipeline #2490 passed
......@@ -93,6 +93,7 @@ library
Quasar.Prelude
Quasar.PreludeExtras
Quasar.ResourceManager
Quasar.Subscribable
Quasar.Timer
Quasar.Utils.ExtraT
hs-source-dirs:
......@@ -116,5 +117,6 @@ test-suite quasar-test
Quasar.ObservableSpec
Quasar.Observable.ObservableHashMapSpec
Quasar.Observable.ObservablePrioritySpec
Quasar.SubscribableSpec
hs-source-dirs:
test
......@@ -4,6 +4,7 @@ module Quasar (
module Quasar.Disposable,
module Quasar.Observable,
module Quasar.ResourceManager,
module Quasar.Subscribable,
) where
import Quasar.Async
......@@ -11,3 +12,4 @@ import Quasar.Awaitable
import Quasar.Disposable
import Quasar.Observable
import Quasar.ResourceManager
import Quasar.Subscribable
module Quasar.Subscribable (
SubscribableMessage(..),
IsSubscribable(..),
Subscribable,
SubscribableEvent,
newSubscribableEvent,
raiseSubscribableEvent,
) where
import Control.Concurrent.STM
import Control.Monad.Catch
import Control.Monad.Reader
import Data.HashMap.Strict qualified as HM
import Quasar.Awaitable
import Quasar.Disposable
import Quasar.Prelude
import Quasar.ResourceManager
data SubscribableMessage r
= SubscribableUpdate r
| SubscribableNotAvailable SomeException
instance Functor SubscribableMessage where
fmap fn (SubscribableUpdate r) = SubscribableUpdate (fn r)
fmap _ (SubscribableNotAvailable ex) = SubscribableNotAvailable ex
class IsSubscribable r a | a -> r where
toSubscribable :: a -> Subscribable r
toSubscribable x = Subscribable x
subscribe
:: MonadResourceManager m
=> a
-> (forall f. MonadResourceManager f => SubscribableMessage r -> f (Awaitable ()))
-> m ()
subscribe x = subscribe (toSubscribable x)
{-# MINIMAL toSubscribable | subscribe #-}
data Subscribable r where
Subscribable :: IsSubscribable r a => a -> Subscribable r
MappedSubscribable :: IsSubscribable a o => (a -> r) -> o -> Subscribable r
MultiSubscribable :: [Subscribable r] -> Subscribable r
instance IsSubscribable r (Subscribable r) where
toSubscribable = id
subscribe (Subscribable x) callback = subscribe x callback
subscribe (MappedSubscribable fn x) callback = subscribe x (callback . fmap fn)
subscribe (MultiSubscribable xs) callback = forM_ xs (`subscribe` callback)
instance Functor Subscribable where
fmap fn (Subscribable x) = MappedSubscribable fn x
fmap fn (MappedSubscribable fn2 x) = MappedSubscribable (fn . fn2) x
fmap fn x@(MultiSubscribable _) = MappedSubscribable fn x
instance Semigroup (Subscribable r) where
MultiSubscribable [] <> x = x
x <> MultiSubscribable [] = x
MultiSubscribable as <> MultiSubscribable bs = MultiSubscribable $ as <> bs
MultiSubscribable as <> b = MultiSubscribable $ as <> [b]
a <> MultiSubscribable bs = MultiSubscribable $ [a] <> bs
a <> b = MultiSubscribable [a, b]
instance Monoid (Subscribable r) where
mempty = MultiSubscribable []
newtype SubscribableEvent r = SubscribableEvent (TVar (HM.HashMap Unique (SubscribableMessage r -> IO (Awaitable()))))
instance IsSubscribable r (SubscribableEvent r) where
subscribe (SubscribableEvent tvar) callback = mask_ do
key <- liftIO newUnique
resourceManager <- askResourceManager
liftIO $ atomically do
callbackMap <- readTVar tvar
writeTVar tvar $ HM.insert key (\msg -> runReaderT (callback msg) resourceManager) callbackMap
registerDisposable =<< synchronousDisposable (disposeFn key)
where
disposeFn :: Unique -> IO ()
disposeFn key = atomically do
callbackMap <- readTVar tvar
writeTVar tvar $ HM.delete key callbackMap
newSubscribableEvent :: MonadIO m => m (SubscribableEvent r)
newSubscribableEvent = liftIO $ SubscribableEvent <$> newTVarIO HM.empty
raiseSubscribableEvent :: MonadIO m => SubscribableEvent r -> r -> m ()
raiseSubscribableEvent (SubscribableEvent tvar) r = liftIO do
callbackMap <- readTVarIO tvar
awaitables <- forM (HM.elems callbackMap) \callback -> do
callback $ SubscribableUpdate r
await $ mconcat awaitables
module Quasar.SubscribableSpec (
spec,
) where
import Control.Concurrent.STM
import Quasar.Prelude
import Quasar.ResourceManager
import Quasar.Subscribable
import Test.Hspec
spec :: Spec
spec = do
describe "SubscribableEvent" $ parallel do
it "can be subscribed" $ io $ withResourceManagerM do
event <- newSubscribableEvent
resultVar <- liftIO newEmptyTMVarIO
subscribe event $ liftIO . \case
SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
SubscribableNotAvailable ex -> throwIO ex
raiseSubscribableEvent event (42 :: Int)
liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
it "stops calling the callback after the subscription is disposed" $ io $ withResourceManagerM do
event <- newSubscribableEvent
resultVar <- liftIO $ newEmptyTMVarIO
withSubResourceManagerM do
subscribe event $ liftIO . \case
SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
SubscribableNotAvailable ex -> throwIO ex
raiseSubscribableEvent event (42 :: Int)
liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
raiseSubscribableEvent event (21 :: Int)
liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Nothing
it "can be fmap'ed" $ io $ withResourceManagerM do
event <- newSubscribableEvent
let subscribable = (* 2) <$> toSubscribable event
resultVar <- liftIO $ newEmptyTMVarIO
subscribe subscribable $ liftIO . \case
SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
SubscribableNotAvailable ex -> throwIO ex
raiseSubscribableEvent event (21 :: Int)
liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
it "can be combined with other events" $ io $ withResourceManagerM do
event1 <- newSubscribableEvent
event2 <- newSubscribableEvent
let subscribable = toSubscribable event1 <> toSubscribable event2
resultVar <- liftIO $ newEmptyTMVarIO
subscribe subscribable $ liftIO . \case
SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
SubscribableNotAvailable ex -> throwIO ex
raiseSubscribableEvent event1 (21 :: Int)
liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 21
raiseSubscribableEvent event2 (42 :: Int)
liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment