From f283b693e81987a20f78798c60e4738c16ffbf8d Mon Sep 17 00:00:00 2001
From: Jan Beinke <git@janbeinke.com>
Date: Fri, 24 Sep 2021 19:57:31 +0200
Subject: [PATCH] Implement the Subscribable

---
 quasar.cabal                    |  2 +
 src/Quasar.hs                   |  2 +
 src/Quasar/Subscribable.hs      | 92 +++++++++++++++++++++++++++++++++
 test/Quasar/SubscribableSpec.hs | 58 +++++++++++++++++++++
 4 files changed, 154 insertions(+)
 create mode 100644 src/Quasar/Subscribable.hs
 create mode 100644 test/Quasar/SubscribableSpec.hs

diff --git a/quasar.cabal b/quasar.cabal
index 5dcaca7..ddd2245 100644
--- a/quasar.cabal
+++ b/quasar.cabal
@@ -93,6 +93,7 @@ library
     Quasar.Prelude
     Quasar.PreludeExtras
     Quasar.ResourceManager
+    Quasar.Subscribable
     Quasar.Timer
     Quasar.Utils.ExtraT
   hs-source-dirs:
@@ -116,5 +117,6 @@ test-suite quasar-test
     Quasar.ObservableSpec
     Quasar.Observable.ObservableHashMapSpec
     Quasar.Observable.ObservablePrioritySpec
+    Quasar.SubscribableSpec
   hs-source-dirs:
     test
diff --git a/src/Quasar.hs b/src/Quasar.hs
index dedb639..a7e04de 100644
--- a/src/Quasar.hs
+++ b/src/Quasar.hs
@@ -4,6 +4,7 @@ module Quasar (
   module Quasar.Disposable,
   module Quasar.Observable,
   module Quasar.ResourceManager,
+  module Quasar.Subscribable,
 ) where
 
 import Quasar.Async
@@ -11,3 +12,4 @@ import Quasar.Awaitable
 import Quasar.Disposable
 import Quasar.Observable
 import Quasar.ResourceManager
+import Quasar.Subscribable
diff --git a/src/Quasar/Subscribable.hs b/src/Quasar/Subscribable.hs
new file mode 100644
index 0000000..dece28e
--- /dev/null
+++ b/src/Quasar/Subscribable.hs
@@ -0,0 +1,92 @@
+module Quasar.Subscribable (
+  SubscribableMessage(..),
+  IsSubscribable(..),
+  Subscribable,
+  SubscribableEvent,
+  newSubscribableEvent,
+  raiseSubscribableEvent,
+) where
+
+import Control.Concurrent.STM
+import Control.Monad.Catch
+import Control.Monad.Reader
+import Data.HashMap.Strict qualified as HM
+import Quasar.Awaitable
+import Quasar.Disposable
+import Quasar.Prelude
+import Quasar.ResourceManager
+
+
+data SubscribableMessage r
+  = SubscribableUpdate r
+  | SubscribableNotAvailable SomeException
+instance Functor SubscribableMessage where
+  fmap fn (SubscribableUpdate r) = SubscribableUpdate (fn r)
+  fmap _ (SubscribableNotAvailable ex) = SubscribableNotAvailable ex
+
+
+class IsSubscribable r a | a -> r where
+  toSubscribable :: a -> Subscribable r
+  toSubscribable x = Subscribable x
+
+  subscribe
+    :: MonadResourceManager m
+    => a
+    -> (forall f. MonadResourceManager f => SubscribableMessage r -> f (Awaitable ()))
+    -> m ()
+  subscribe x = subscribe (toSubscribable x)
+  {-# MINIMAL toSubscribable | subscribe #-}
+
+data Subscribable r where
+  Subscribable :: IsSubscribable r a => a -> Subscribable r
+  MappedSubscribable :: IsSubscribable a o => (a -> r) -> o -> Subscribable r
+  MultiSubscribable :: [Subscribable r] -> Subscribable r
+
+instance IsSubscribable r (Subscribable r) where
+  toSubscribable = id
+  subscribe (Subscribable x) callback = subscribe x callback
+  subscribe (MappedSubscribable fn x) callback = subscribe x (callback . fmap fn)
+  subscribe (MultiSubscribable xs) callback = forM_ xs (`subscribe` callback)
+
+instance Functor Subscribable where
+  fmap fn (Subscribable x) = MappedSubscribable fn x
+  fmap fn (MappedSubscribable fn2 x) = MappedSubscribable (fn . fn2) x
+  fmap fn x@(MultiSubscribable _) = MappedSubscribable fn x
+
+instance Semigroup (Subscribable r) where
+  MultiSubscribable [] <> x = x
+  x <> MultiSubscribable [] = x
+  MultiSubscribable as <> MultiSubscribable bs = MultiSubscribable $ as <> bs
+  MultiSubscribable as <> b = MultiSubscribable $ as <> [b]
+  a <> MultiSubscribable bs = MultiSubscribable $ [a] <> bs
+  a <> b = MultiSubscribable [a, b]
+
+instance Monoid (Subscribable r) where
+  mempty = MultiSubscribable []
+
+
+newtype SubscribableEvent r = SubscribableEvent (TVar (HM.HashMap Unique (SubscribableMessage r -> IO (Awaitable()))))
+instance IsSubscribable r (SubscribableEvent r) where
+  subscribe (SubscribableEvent tvar) callback = mask_ do
+    key <- liftIO newUnique
+    resourceManager <- askResourceManager
+    liftIO $ atomically do
+      callbackMap <- readTVar tvar
+      writeTVar tvar $ HM.insert key (\msg -> runReaderT (callback msg) resourceManager) callbackMap
+    registerDisposable =<< synchronousDisposable (disposeFn key)
+      where
+        disposeFn :: Unique -> IO ()
+        disposeFn key = atomically do
+          callbackMap <- readTVar tvar
+          writeTVar tvar $ HM.delete key callbackMap
+
+
+newSubscribableEvent :: MonadIO m => m (SubscribableEvent r)
+newSubscribableEvent = liftIO $ SubscribableEvent <$> newTVarIO HM.empty
+
+raiseSubscribableEvent :: MonadIO m => SubscribableEvent r -> r -> m ()
+raiseSubscribableEvent (SubscribableEvent tvar) r = liftIO do
+  callbackMap <- readTVarIO tvar
+  awaitables <- forM (HM.elems callbackMap) \callback -> do
+    callback $ SubscribableUpdate r
+  await $ mconcat awaitables
diff --git a/test/Quasar/SubscribableSpec.hs b/test/Quasar/SubscribableSpec.hs
new file mode 100644
index 0000000..c9ca1bc
--- /dev/null
+++ b/test/Quasar/SubscribableSpec.hs
@@ -0,0 +1,58 @@
+module Quasar.SubscribableSpec (
+  spec,
+) where
+
+import Control.Concurrent.STM
+import Quasar.Prelude
+import Quasar.ResourceManager
+import Quasar.Subscribable
+import Test.Hspec
+
+
+spec :: Spec
+spec = do
+  describe "SubscribableEvent" $ parallel do
+
+    it "can be subscribed" $ io $ withResourceManagerM do
+      event <- newSubscribableEvent
+      resultVar <- liftIO newEmptyTMVarIO
+      subscribe event $ liftIO . \case
+        SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
+        SubscribableNotAvailable ex -> throwIO ex
+      raiseSubscribableEvent event (42 :: Int)
+      liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
+
+    it "stops calling the callback after the subscription is disposed" $ io $ withResourceManagerM do
+      event <- newSubscribableEvent
+      resultVar <- liftIO $ newEmptyTMVarIO
+      withSubResourceManagerM do
+        subscribe event $ liftIO . \case
+          SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
+          SubscribableNotAvailable ex -> throwIO ex
+        raiseSubscribableEvent event (42 :: Int)
+        liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
+      raiseSubscribableEvent event (21 :: Int)
+      liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Nothing
+
+    it "can be fmap'ed" $ io $ withResourceManagerM do
+      event <- newSubscribableEvent
+      let subscribable = (* 2) <$> toSubscribable event
+      resultVar <- liftIO $ newEmptyTMVarIO
+      subscribe subscribable $ liftIO . \case
+        SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
+        SubscribableNotAvailable ex -> throwIO ex
+      raiseSubscribableEvent event (21 :: Int)
+      liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
+
+    it "can be combined with other events" $ io $ withResourceManagerM do
+      event1 <- newSubscribableEvent
+      event2 <- newSubscribableEvent
+      let subscribable = toSubscribable event1 <> toSubscribable event2
+      resultVar <- liftIO $ newEmptyTMVarIO
+      subscribe subscribable $ liftIO . \case
+        SubscribableUpdate r -> atomically (putTMVar resultVar r) >> mempty
+        SubscribableNotAvailable ex -> throwIO ex
+      raiseSubscribableEvent event1 (21 :: Int)
+      liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 21
+      raiseSubscribableEvent event2 (42 :: Int)
+      liftIO $ atomically (tryTakeTMVar resultVar) `shouldReturn` Just 42
-- 
GitLab