Skip to content
Snippets Groups Projects
Commit f9d78416 authored by Jens Nolte's avatar Jens Nolte
Browse files

Implement mergeObservable to combine values from two observables

parent 14b65f3f
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,8 @@ module Qd.Observable ( ...@@ -18,6 +18,8 @@ module Qd.Observable (
modifyObservableVar, modifyObservableVar,
joinObservable, joinObservable,
joinObservableEither, joinObservableEither,
mergeObservable,
mergeObservable',
FnObservable(..), FnObservable(..),
) where ) where
...@@ -25,6 +27,7 @@ import Control.Concurrent.MVar ...@@ -25,6 +27,7 @@ import Control.Concurrent.MVar
import Control.Monad.Fix (mfix) import Control.Monad.Fix (mfix)
import Data.Binary (Binary) import Data.Binary (Binary)
import qualified Data.HashMap.Strict as HM import qualified Data.HashMap.Strict as HM
import Data.IORef
import Data.Unique import Data.Unique
data MessageReason = Current | Update data MessageReason = Current | Update
...@@ -75,7 +78,7 @@ instance Observable v o => Observable v (IO o) where ...@@ -75,7 +78,7 @@ instance Observable v o => Observable v (IO o) where
subscribe observable callback subscribe observable callback
class Settable v a where class Settable v a | a -> v where
setValue :: a -> v -> IO () setValue :: a -> v -> IO ()
...@@ -199,6 +202,42 @@ instance forall e o i v. (Observable (Either e i) o, Observable v i) => Observab ...@@ -199,6 +202,42 @@ instance forall e o i v. (Observable (Either e i) o, Observable v i) => Observab
joinObservableEither :: (Observable (Either e i) o, Observable v i) => o -> SomeObservable (Either e v) joinObservableEither :: (Observable (Either e i) o, Observable v i) => o -> SomeObservable (Either e v)
joinObservableEither = SomeObservable . JoinedObservableEither joinObservableEither = SomeObservable . JoinedObservableEither
data MergedObservable o0 v0 o1 v1 r = MergedObservable (Maybe v0 -> Maybe v1 -> Maybe r) o0 o1
instance forall o0 v0 o1 v1 r. (Observable v0 o0, Observable v1 o1) => Observable r (MergedObservable o0 v0 o1 v1 r) where
getValue (MergedObservable merge obs0 obs1) = do
x0 <- getValue obs0
x1 <- getValue obs1
return $ merge x0 x1
subscribe (MergedObservable merge obs0 obs1) callback = do
currentValuesTupleRef <- newIORef (Nothing, Nothing)
sub0 <- subscribe obs0 (mergeCallback currentValuesTupleRef . fmap Left)
sub1 <- subscribe obs1 (mergeCallback currentValuesTupleRef . fmap Right)
return $ SubscriptionHandle{unsubscribe = unsubscribe sub0 >> unsubscribe sub1}
where
mergeCallback :: IORef (Maybe v0, Maybe v1) -> (MessageReason, Either (Maybe v0) (Maybe v1)) -> IO ()
mergeCallback currentValuesTupleRef (reason, state) = do
currentTuple <- atomicModifyIORef' currentValuesTupleRef (dup . updateTuple state)
callback (reason, uncurry merge $ currentTuple)
updateTuple :: Either (Maybe v0) (Maybe v1) -> (Maybe v0, Maybe v1) -> (Maybe v0, Maybe v1)
updateTuple (Left l) (_, r) = (l, r)
updateTuple (Right r) (l, _) = (l, r)
dup :: a -> (a, a)
dup x = (x, x)
-- | Merge two observables using a given merge function. Whenever the value of one of the inputs changes, the resulting observable updates according to the merge function.
--
-- There is no caching involed, every subscriber effectively subscribes to both input observables.
mergeObservable :: (Observable v0 o0, Observable v1 o1) => (Maybe v0 -> Maybe v1 -> Maybe r) -> o0 -> o1 -> SomeObservable r
mergeObservable merge x y = SomeObservable $ MergedObservable merge x y
-- | Like `mergeObservable`, but with a simplified signature that ignores the Maybe wrapper: If either value is `Nothing`, the resulting value will be `Nothing`.
mergeObservable' :: (Observable v0 o0, Observable v1 o1) => (v0 -> v1 -> r) -> o0 -> o1 -> SomeObservable r
mergeObservable' merge x y = SomeObservable $ MergedObservable (liftA2 merge) x y
-- | Data type that can be used as an implementation for the `Observable` interface that works by directly providing functions for `getValue` and `subscribe`.
data FnObservable v = FnObservable { data FnObservable v = FnObservable {
getValueFn :: IO (ObservableState v), getValueFn :: IO (ObservableState v),
subscribeFn :: (ObservableMessage v -> IO ()) -> IO SubscriptionHandle subscribeFn :: (ObservableMessage v -> IO ()) -> IO SubscriptionHandle
......
module Qd.ObservableSpec where
import Test.Hspec
import Qd.Observable
import Control.Monad (void)
import Data.IORef
spec :: Spec
spec = do
mergeObservableSpec
mergeObservableSpec :: Spec
mergeObservableSpec = do
describe "mergeObservable" $ parallel $ do
it "merges correctly using getValue" $ do
a <- newObservableVar Nothing
b <- newObservableVar Nothing
let mergedObservable = mergeObservable (\v0 v1 -> Just (v0, v1)) a b
let latestShouldBe = (getValue mergedObservable `shouldReturn`) . Just
testSequence a b latestShouldBe
it "merges correctly using subscribe" $ do
a <- newObservableVar Nothing
b <- newObservableVar Nothing
let mergedObservable = mergeObservable (\v0 v1 -> Just (v0, v1)) a b
(latestRef :: IORef (Maybe (Maybe String, Maybe String))) <- newIORef Nothing
void $ subscribe mergedObservable (writeIORef latestRef . snd)
let latestShouldBe = ((readIORef latestRef) `shouldReturn`) . Just
testSequence a b latestShouldBe
where
testSequence :: ObservableVar String -> ObservableVar String -> ((Maybe String, Maybe String) -> IO ()) -> IO ()
testSequence a b latestShouldBe = do
latestShouldBe (Nothing, Nothing)
setValue a "a0"
latestShouldBe (Just "a0", Nothing)
setValue b "b0"
latestShouldBe (Just "a0", Just "b0")
setValue a "a1"
latestShouldBe (Just "a1", Just "b0")
setValue b "b1"
latestShouldBe (Just "a1", Just "b1")
-- No change
setValue a "a1"
latestShouldBe (Just "a1", Just "b1")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment