diff --git a/src/Quasar/Observable.hs b/src/Quasar/Observable.hs index e2f0c655e4e8f3eec11f81f8b68327e658adaade..7c69ac132dfd0ca0ec65642b0e309fd42218f316 100644 --- a/src/Quasar/Observable.hs +++ b/src/Quasar/Observable.hs @@ -242,16 +242,15 @@ instance IsRetrievable r (LiftA2Observable r) where liftA2 fn (await x) (await y) instance IsObservable r (LiftA2Observable r) where - observe (LiftA2Observable fn fx fy) callback = do - -- TODO use alternative to ensureSTM - var0 <- ensureSTM $ newTVar Nothing - var1 <- ensureSTM $ newTVar Nothing + observe (LiftA2Observable fn fx fy) callback = ensureQuasarSTM do + var0 <- liftSTM $ newTVar Nothing + var1 <- liftSTM $ newTVar Nothing let callCallback = do - mergedValue <- ensureSTM $ runMaybeT $ liftA2 (liftA2 fn) (MaybeT (readTVar var0)) (MaybeT (readTVar var1)) + mergedValue <- liftSTM $ runMaybeT $ liftA2 (liftA2 fn) (MaybeT (readTVar var0)) (MaybeT (readTVar var1)) -- Run the callback only once both values have been received mapM_ callback mergedValue - observe fx (\update -> ensureSTM (writeTVar var0 (Just update)) >> callCallback) - observe fy (\update -> ensureSTM (writeTVar var1 (Just update)) >> callCallback) + observe fx (\update -> liftSTM (writeTVar var0 (Just update)) >> callCallback) + observe fy (\update -> liftSTM (writeTVar var1 (Just update)) >> callCallback) pingObservable (LiftA2Observable _ fx fy) = liftQuasarIO do -- LATER: keep backpressure for parallel network requests