From deeca79f74774127eead6d8c079356dba5a02143 Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Thu, 12 Aug 2021 02:09:17 +0200 Subject: [PATCH] Add packing and unpacking of ObservableMessages Co-authored-by: Jan Beinke <git@janbeinke.com> --- src/Quasar/Network/Runtime/Observable.hs | 35 ++++++++++++++++++++---- src/Quasar/Network/TH.hs | 2 +- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index 8391e38..5d9181a 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -1,16 +1,38 @@ -module Quasar.Network.Runtime.Observable (newObservableStub, observeToStream) where +module Quasar.Network.Runtime.Observable ( + PackedObservableMessage, + newObservableStub, + observeToStream, +) where import Data.Binary (Binary) import Quasar.Awaitable import Quasar.Core +import Quasar.Network.Exception import Quasar.Network.Runtime import Quasar.Observable import Quasar.Prelude +data PackedObservableMessage v + = PackedObservableUpdate v + | PackedObservableLoading + | PackedObservableNotAvailable PackedException + deriving stock (Eq, Show, Generic) + deriving anyclass (Binary) + +packObservableMessage :: ObservableMessage r -> PackedObservableMessage r +packObservableMessage (ObservableUpdate x) = PackedObservableUpdate x +packObservableMessage (ObservableLoading) = PackedObservableLoading +packObservableMessage (ObservableNotAvailable ex) = PackedObservableNotAvailable (packException ex) + +unpackObservableMessage :: PackedObservableMessage r -> ObservableMessage r +unpackObservableMessage (PackedObservableUpdate x) = ObservableUpdate x +unpackObservableMessage (PackedObservableLoading) = ObservableLoading +unpackObservableMessage (PackedObservableNotAvailable ex) = ObservableNotAvailable (unpackException ex) + newObservableStub :: forall v. Binary v => (forall m. MonadIO m => m (Awaitable v)) - -> (forall m. MonadIO m => m (Stream Void v)) + -> (forall m. MonadIO m => m (Stream Void (PackedObservableMessage v))) -> IO (Observable v) newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObservable -- TODO cache where @@ -18,14 +40,15 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv uncachedObservable = fnObservable observeFn retrieveFn observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable observeFn callback = do + -- TODO send updates about the connection status stream <- startObserveRequest - streamSetHandler stream (callback . ObservableUpdate) + streamSetHandler stream (callback . unpackObservableMessage) pure $ synchronousDisposable $ streamClose stream retrieveFn :: forall m. HasResourceManager m => m (Task v) retrieveFn = toTask <$> startRetrieveRequest -observeToStream :: HasResourceManager m => Observable v -> Stream v Void -> m () +observeToStream :: (Binary v, HasResourceManager m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () observeToStream observable stream = do - disposable <- liftIO $ observe observable undefined + _disposable <- liftIO $ observe observable $ streamSend stream . packObservableMessage -- TODO: dispose when the stream is closed - undefined + pure () diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index ca75dd1..ca042a5 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -372,7 +372,7 @@ generateObservable api observable = pure Code { observeRequest = Request { name = observable.name <> "_observe", fields = [], - createdResources = [RequestCreateStream [t|Void|] observable.ty], + createdResources = [RequestCreateStream [t|Void|] [t|PackedObservableMessage $(observable.ty)|]], mResponse = Nothing, handlerE = \ctx -> [|observeToStream $(observableE ctx) $(ctx.resourceEs !! 0)|] } -- GitLab