diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index 8391e3821f0fd8fe2a0174a90c07213326ea8f78..5d9181a0a46aae3d7e9927801e85147b93fa05ba 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -1,16 +1,38 @@ -module Quasar.Network.Runtime.Observable (newObservableStub, observeToStream) where +module Quasar.Network.Runtime.Observable ( + PackedObservableMessage, + newObservableStub, + observeToStream, +) where import Data.Binary (Binary) import Quasar.Awaitable import Quasar.Core +import Quasar.Network.Exception import Quasar.Network.Runtime import Quasar.Observable import Quasar.Prelude +data PackedObservableMessage v + = PackedObservableUpdate v + | PackedObservableLoading + | PackedObservableNotAvailable PackedException + deriving stock (Eq, Show, Generic) + deriving anyclass (Binary) + +packObservableMessage :: ObservableMessage r -> PackedObservableMessage r +packObservableMessage (ObservableUpdate x) = PackedObservableUpdate x +packObservableMessage (ObservableLoading) = PackedObservableLoading +packObservableMessage (ObservableNotAvailable ex) = PackedObservableNotAvailable (packException ex) + +unpackObservableMessage :: PackedObservableMessage r -> ObservableMessage r +unpackObservableMessage (PackedObservableUpdate x) = ObservableUpdate x +unpackObservableMessage (PackedObservableLoading) = ObservableLoading +unpackObservableMessage (PackedObservableNotAvailable ex) = ObservableNotAvailable (unpackException ex) + newObservableStub :: forall v. Binary v => (forall m. MonadIO m => m (Awaitable v)) - -> (forall m. MonadIO m => m (Stream Void v)) + -> (forall m. MonadIO m => m (Stream Void (PackedObservableMessage v))) -> IO (Observable v) newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObservable -- TODO cache where @@ -18,14 +40,15 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv uncachedObservable = fnObservable observeFn retrieveFn observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable observeFn callback = do + -- TODO send updates about the connection status stream <- startObserveRequest - streamSetHandler stream (callback . ObservableUpdate) + streamSetHandler stream (callback . unpackObservableMessage) pure $ synchronousDisposable $ streamClose stream retrieveFn :: forall m. HasResourceManager m => m (Task v) retrieveFn = toTask <$> startRetrieveRequest -observeToStream :: HasResourceManager m => Observable v -> Stream v Void -> m () +observeToStream :: (Binary v, HasResourceManager m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () observeToStream observable stream = do - disposable <- liftIO $ observe observable undefined + _disposable <- liftIO $ observe observable $ streamSend stream . packObservableMessage -- TODO: dispose when the stream is closed - undefined + pure () diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index ca75dd1bcdc7969da041c2825e6c7cfbe79b4675..ca042a5f05ce36361dcdc758069f632bb1dac256 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -372,7 +372,7 @@ generateObservable api observable = pure Code { observeRequest = Request { name = observable.name <> "_observe", fields = [], - createdResources = [RequestCreateStream [t|Void|] observable.ty], + createdResources = [RequestCreateStream [t|Void|] [t|PackedObservableMessage $(observable.ty)|]], mResponse = Nothing, handlerE = \ctx -> [|observeToStream $(observableE ctx) $(ctx.resourceEs !! 0)|] }