diff --git a/src/lib/Qd/Observable.hs b/src/lib/Qd/Observable.hs index f306ce1a20618e4749d7e86ffa2a0bd1c023eb10..5eb95dd279dffd5cccb82331968d19f8be88e817 100644 --- a/src/lib/Qd/Observable.hs +++ b/src/lib/Qd/Observable.hs @@ -1,68 +1,103 @@ -module Qd.Observable where +module Qd.Observable ( + Observable, + IsObservable(..), + subscribe', + SubscriptionHandle, + unsubscribe, + Callback, + ObservableState, + ObservableMessage, + MessageReason(..), + + BasicObservable(..), + Freshness(..), + mkBasicObservable, + staleBasicObservable, + updateBasicObservable, +) where import Control.Concurrent.MVar -import Data.List (delete) +import Control.Monad.Fix (mfix) +import qualified Data.HashMap.Strict as HM import Data.Time.Clock (UTCTime, getCurrentTime) import Data.Unique data Freshness = Fresh | Stale + deriving (Eq, Show) data MessageReason = Current | Update + deriving (Eq, Show) type ObservableState v = Maybe (v, Freshness, UTCTime) type ObservableMessage v = (MessageReason, ObservableState v) +mapObservableState :: Monad m => (a -> m b) -> ObservableState a -> m (ObservableState b) +mapObservableState _ Nothing = return Nothing +mapObservableState f (Just (v, fr, t)) = Just . (, fr, t) <$> f v -data Listener v = Listener Unique (ObservableMessage v -> IO ()) -instance Eq (Listener v) where - Listener a _ == Listener b _ = a == b +mapObservableMessage :: Monad m => (a -> m b) -> ObservableMessage a -> m (ObservableMessage b) +mapObservableMessage f (r, s) = (r, ) <$> mapObservableState f s -createListener :: (ObservableMessage v -> IO ()) -> IO (Listener v) -createListener f = Listener <$> newUnique <*> (return f) +newtype SubscriptionHandle = SubscriptionHandle (IO ()) +unsubscribe :: SubscriptionHandle -> IO () +unsubscribe (SubscriptionHandle unsubscribeAction) = unsubscribeAction -class IsObservable v o where +class IsObservable v o | o -> v where getValue :: o -> IO (ObservableState v) - subscribe :: o -> Listener v -> IO () - unsubscribe :: o -> Listener v -> IO () + subscribe :: o -> (ObservableMessage v -> IO ()) -> IO SubscriptionHandle + mapObservable :: (v -> IO a) -> o -> Observable a + mapObservable f = Observable . MappedObservable f + +subscribe' :: IsObservable v o => o -> (SubscriptionHandle -> ObservableMessage v -> IO ()) -> IO SubscriptionHandle +subscribe' observable callback = mfix $ \subscription -> subscribe observable (callback subscription) + +type Callback v = ObservableMessage v -> IO () -- | Wraps IsObservable in a concrete type data Observable v = forall o. IsObservable v o => Observable o instance IsObservable v (Observable v) where getValue (Observable o) = getValue o subscribe (Observable o) = subscribe o - unsubscribe (Observable o) = unsubscribe o + mapObservable f (Observable o) = mapObservable f o -newtype BasicObservable v = BasicObservable (MVar (ObservableState v, [Listener v])) +instance Functor Observable where + fmap f = mapObservable (return . f) + +newtype BasicObservable v = BasicObservable (MVar (ObservableState v, HM.HashMap Unique (Callback v))) instance IsObservable v (BasicObservable v) where getValue (BasicObservable mvar) = fst <$> readMVar mvar - subscribe (BasicObservable mvar) listener@(Listener _ callback) = do - modifyMVar_ mvar $ \(state, listeners) -> do + subscribe (BasicObservable mvar) callback = do + key <- newUnique + modifyMVar_ mvar $ \(state, subscribers) -> do -- Call listener callback (Current, state) - return (state, listener:listeners) - unsubscribe (BasicObservable mvar) listener = modifyMVar_ mvar $ \(state, listeners) -> return (state, delete listener listeners) + return (state, HM.insert key callback subscribers) + return $ SubscriptionHandle $ unsubscribe' key + where + unsubscribe' :: Unique -> IO () + unsubscribe' key = modifyMVar_ mvar $ \(state, subscribers) -> return (state, HM.delete key subscribers) mkBasicObservable :: Maybe v -> IO (BasicObservable v) mkBasicObservable defaultValue = do now <- getCurrentTime - BasicObservable <$> newMVar ((, Fresh, now) <$> defaultValue, []) + BasicObservable <$> newMVar ((, Fresh, now) <$> defaultValue, HM.empty) staleBasicObservable :: BasicObservable v -> IO () staleBasicObservable (BasicObservable mvar) = do - modifyMVar_ mvar $ \(oldState, listeners) -> do + modifyMVar_ mvar $ \(oldState, subscribers) -> do let newState = (\(v, _, t) -> (v, Stale, t)) <$> oldState - mapM_ (\(Listener _ callback) -> callback (Update, newState)) listeners - return (newState, listeners) + mapM_ (\callback -> callback (Update, newState)) subscribers + return (newState, subscribers) updateBasicObservable :: forall v. BasicObservable v -> Maybe v -> IO () updateBasicObservable (BasicObservable mvar) value = do now <- getCurrentTime let newState = (, Fresh, now) <$> value - modifyMVar_ mvar $ \(state, listeners) -> do - mapM_ (\(Listener _ callback) -> callback (Update, state)) listeners - return (newState, listeners) - -mapObservable :: (a -> b) -> Observable a -> Observable b -mapObservable = undefined + modifyMVar_ mvar $ \(state, subscribers) -> do + mapM_ (\callback -> callback (Update, state)) subscribers + return (newState, subscribers) -mapMObservable :: (a -> IO b) -> Observable a -> Observable b -mapMObservable = undefined +data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> IO b) o +instance IsObservable v (MappedObservable v) where + getValue (MappedObservable f observable) = mapObservableState f =<< getValue observable + subscribe (MappedObservable f observable) callback = subscribe observable (callback <=< mapObservableMessage f) + mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 <=< f2) upstream