Skip to content
Snippets Groups Projects
Commit deeca79f authored by Jens Nolte's avatar Jens Nolte
Browse files

Add packing and unpacking of ObservableMessages


Co-authored-by: default avatarJan Beinke <git@janbeinke.com>
parent 056640cc
No related branches found
No related tags found
No related merge requests found
module Quasar.Network.Runtime.Observable (newObservableStub, observeToStream) where
module Quasar.Network.Runtime.Observable (
PackedObservableMessage,
newObservableStub,
observeToStream,
) where
import Data.Binary (Binary)
import Quasar.Awaitable
import Quasar.Core
import Quasar.Network.Exception
import Quasar.Network.Runtime
import Quasar.Observable
import Quasar.Prelude
data PackedObservableMessage v
= PackedObservableUpdate v
| PackedObservableLoading
| PackedObservableNotAvailable PackedException
deriving stock (Eq, Show, Generic)
deriving anyclass (Binary)
packObservableMessage :: ObservableMessage r -> PackedObservableMessage r
packObservableMessage (ObservableUpdate x) = PackedObservableUpdate x
packObservableMessage (ObservableLoading) = PackedObservableLoading
packObservableMessage (ObservableNotAvailable ex) = PackedObservableNotAvailable (packException ex)
unpackObservableMessage :: PackedObservableMessage r -> ObservableMessage r
unpackObservableMessage (PackedObservableUpdate x) = ObservableUpdate x
unpackObservableMessage (PackedObservableLoading) = ObservableLoading
unpackObservableMessage (PackedObservableNotAvailable ex) = ObservableNotAvailable (unpackException ex)
newObservableStub
:: forall v. Binary v
=> (forall m. MonadIO m => m (Awaitable v))
-> (forall m. MonadIO m => m (Stream Void v))
-> (forall m. MonadIO m => m (Stream Void (PackedObservableMessage v)))
-> IO (Observable v)
newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObservable -- TODO cache
where
......@@ -18,14 +40,15 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv
uncachedObservable = fnObservable observeFn retrieveFn
observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable
observeFn callback = do
-- TODO send updates about the connection status
stream <- startObserveRequest
streamSetHandler stream (callback . ObservableUpdate)
streamSetHandler stream (callback . unpackObservableMessage)
pure $ synchronousDisposable $ streamClose stream
retrieveFn :: forall m. HasResourceManager m => m (Task v)
retrieveFn = toTask <$> startRetrieveRequest
observeToStream :: HasResourceManager m => Observable v -> Stream v Void -> m ()
observeToStream :: (Binary v, HasResourceManager m) => Observable v -> Stream (PackedObservableMessage v) Void -> m ()
observeToStream observable stream = do
disposable <- liftIO $ observe observable undefined
_disposable <- liftIO $ observe observable $ streamSend stream . packObservableMessage
-- TODO: dispose when the stream is closed
undefined
pure ()
......@@ -372,7 +372,7 @@ generateObservable api observable = pure Code {
observeRequest = Request {
name = observable.name <> "_observe",
fields = [],
createdResources = [RequestCreateStream [t|Void|] observable.ty],
createdResources = [RequestCreateStream [t|Void|] [t|PackedObservableMessage $(observable.ty)|]],
mResponse = Nothing,
handlerE = \ctx -> [|observeToStream $(observableE ctx) $(ctx.resourceEs !! 0)|]
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment