Skip to content
Snippets Groups Projects
Commit 9f627199 authored by Jens Nolte's avatar Jens Nolte
Browse files

Continue implementing BasicObservable

parent dbee9456
No related branches found
No related tags found
No related merge requests found
module Qd.Observable where
module Qd.Observable (
Observable,
IsObservable(..),
subscribe',
SubscriptionHandle,
unsubscribe,
Callback,
ObservableState,
ObservableMessage,
MessageReason(..),
BasicObservable(..),
Freshness(..),
mkBasicObservable,
staleBasicObservable,
updateBasicObservable,
) where
import Control.Concurrent.MVar
import Data.List (delete)
import Control.Monad.Fix (mfix)
import qualified Data.HashMap.Strict as HM
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Unique
data Freshness = Fresh | Stale
deriving (Eq, Show)
data MessageReason = Current | Update
deriving (Eq, Show)
type ObservableState v = Maybe (v, Freshness, UTCTime)
type ObservableMessage v = (MessageReason, ObservableState v)
mapObservableState :: Monad m => (a -> m b) -> ObservableState a -> m (ObservableState b)
mapObservableState _ Nothing = return Nothing
mapObservableState f (Just (v, fr, t)) = Just . (, fr, t) <$> f v
data Listener v = Listener Unique (ObservableMessage v -> IO ())
instance Eq (Listener v) where
Listener a _ == Listener b _ = a == b
mapObservableMessage :: Monad m => (a -> m b) -> ObservableMessage a -> m (ObservableMessage b)
mapObservableMessage f (r, s) = (r, ) <$> mapObservableState f s
createListener :: (ObservableMessage v -> IO ()) -> IO (Listener v)
createListener f = Listener <$> newUnique <*> (return f)
newtype SubscriptionHandle = SubscriptionHandle (IO ())
unsubscribe :: SubscriptionHandle -> IO ()
unsubscribe (SubscriptionHandle unsubscribeAction) = unsubscribeAction
class IsObservable v o where
class IsObservable v o | o -> v where
getValue :: o -> IO (ObservableState v)
subscribe :: o -> Listener v -> IO ()
unsubscribe :: o -> Listener v -> IO ()
subscribe :: o -> (ObservableMessage v -> IO ()) -> IO SubscriptionHandle
mapObservable :: (v -> IO a) -> o -> Observable a
mapObservable f = Observable . MappedObservable f
subscribe' :: IsObservable v o => o -> (SubscriptionHandle -> ObservableMessage v -> IO ()) -> IO SubscriptionHandle
subscribe' observable callback = mfix $ \subscription -> subscribe observable (callback subscription)
type Callback v = ObservableMessage v -> IO ()
-- | Wraps IsObservable in a concrete type
data Observable v = forall o. IsObservable v o => Observable o
instance IsObservable v (Observable v) where
getValue (Observable o) = getValue o
subscribe (Observable o) = subscribe o
unsubscribe (Observable o) = unsubscribe o
mapObservable f (Observable o) = mapObservable f o
newtype BasicObservable v = BasicObservable (MVar (ObservableState v, [Listener v]))
instance Functor Observable where
fmap f = mapObservable (return . f)
newtype BasicObservable v = BasicObservable (MVar (ObservableState v, HM.HashMap Unique (Callback v)))
instance IsObservable v (BasicObservable v) where
getValue (BasicObservable mvar) = fst <$> readMVar mvar
subscribe (BasicObservable mvar) listener@(Listener _ callback) = do
modifyMVar_ mvar $ \(state, listeners) -> do
subscribe (BasicObservable mvar) callback = do
key <- newUnique
modifyMVar_ mvar $ \(state, subscribers) -> do
-- Call listener
callback (Current, state)
return (state, listener:listeners)
unsubscribe (BasicObservable mvar) listener = modifyMVar_ mvar $ \(state, listeners) -> return (state, delete listener listeners)
return (state, HM.insert key callback subscribers)
return $ SubscriptionHandle $ unsubscribe' key
where
unsubscribe' :: Unique -> IO ()
unsubscribe' key = modifyMVar_ mvar $ \(state, subscribers) -> return (state, HM.delete key subscribers)
mkBasicObservable :: Maybe v -> IO (BasicObservable v)
mkBasicObservable defaultValue = do
now <- getCurrentTime
BasicObservable <$> newMVar ((, Fresh, now) <$> defaultValue, [])
BasicObservable <$> newMVar ((, Fresh, now) <$> defaultValue, HM.empty)
staleBasicObservable :: BasicObservable v -> IO ()
staleBasicObservable (BasicObservable mvar) = do
modifyMVar_ mvar $ \(oldState, listeners) -> do
modifyMVar_ mvar $ \(oldState, subscribers) -> do
let newState = (\(v, _, t) -> (v, Stale, t)) <$> oldState
mapM_ (\(Listener _ callback) -> callback (Update, newState)) listeners
return (newState, listeners)
mapM_ (\callback -> callback (Update, newState)) subscribers
return (newState, subscribers)
updateBasicObservable :: forall v. BasicObservable v -> Maybe v -> IO ()
updateBasicObservable (BasicObservable mvar) value = do
now <- getCurrentTime
let newState = (, Fresh, now) <$> value
modifyMVar_ mvar $ \(state, listeners) -> do
mapM_ (\(Listener _ callback) -> callback (Update, state)) listeners
return (newState, listeners)
mapObservable :: (a -> b) -> Observable a -> Observable b
mapObservable = undefined
modifyMVar_ mvar $ \(state, subscribers) -> do
mapM_ (\callback -> callback (Update, state)) subscribers
return (newState, subscribers)
mapMObservable :: (a -> IO b) -> Observable a -> Observable b
mapMObservable = undefined
data MappedObservable b = forall a o. IsObservable a o => MappedObservable (a -> IO b) o
instance IsObservable v (MappedObservable v) where
getValue (MappedObservable f observable) = mapObservableState f =<< getValue observable
subscribe (MappedObservable f observable) callback = subscribe observable (callback <=< mapObservableMessage f)
mapObservable f1 (MappedObservable f2 upstream) = Observable $ MappedObservable (f1 <=< f2) upstream
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment