diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index 49ae3b74143217199556c1239647b2d6547ef0cb..f46eb9da34eb65ae9b22f64541fb386eba6c7ec8 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -1,12 +1,31 @@ -module Quasar.Network.Runtime.Observable () where +module Quasar.Network.Runtime.Observable (newObservableStub, observeToStream) where -import Quasar.Network.Runtime +import Data.Binary (Binary) +import Quasar.Awaitable import Quasar.Core +import Quasar.Network.Runtime import Quasar.Observable import Quasar.Prelude -newNetworkObservable - :: ((ObservableMessage v -> IO ()) -> IO Disposable) - -> (forall m. HasResourceManager m => m (Task v)) +newObservableStub + :: forall v. Binary v => + (forall m. MonadIO m => m (Stream Void v)) + -> (forall m. MonadIO m => m (Awaitable v)) -> IO (Observable v) -newNetworkObservable observeFn retrieveFn = pure $ fnObservable observeFn retrieveFn +newObservableStub startObserveRequest startRetrieveRequest = pure uncachedObservable -- TODO cache + where + uncachedObservable :: Observable v + uncachedObservable = fnObservable observeFn retrieveFn + observeFn :: (ObservableMessage v -> IO ()) -> IO Disposable + observeFn callback = do + stream <- startObserveRequest + streamSetHandler stream (callback . ObservableUpdate) + pure $ synchronousDisposable $ streamClose stream + retrieveFn :: forall m. HasResourceManager m => m (Task v) + retrieveFn = toTask <$> startRetrieveRequest + +observeToStream :: Observable v -> Stream v Void -> IO () +observeToStream observable stream = do + disposable <- observe observable undefined + -- TODO: dispose when the stream is closed + undefined diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index 503d87af3394d93504f81efb2866a8a7f875cb67..eb4035e8b409d873b7093392c4f71c7a3dbbae0d 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -27,6 +27,7 @@ import Language.Haskell.TH.Syntax import Quasar.Awaitable import Quasar.Network.Multiplexer import Quasar.Network.Runtime +import Quasar.Network.Runtime.Observable import Quasar.Observable import Quasar.Prelude @@ -159,16 +160,11 @@ clientRequestStub api req = do stubName = clientRequestStubName api req makeStubSig :: Q [Type] -> Q Type makeStubSig arguments = - [t|forall m. MonadIO m => $(buildFunctionType arguments [t|m $(buildTupleType (liftA2 (<>) optionalResultType resourceTypes))|])|] - resourceTypes :: Q [Type] - resourceTypes = sequence $ resourceType <$> req.createdResources + [t|forall m. MonadIO m => $(buildFunctionType arguments [t|m $(buildTupleType (liftA2 (<>) optionalResultType (resourceTypes req)))|])|] optionalResultType :: Q [Type] optionalResultType = case req.mResponse of Nothing -> pure [] Just resp -> sequence [[t|Awaitable $(buildTupleType (sequence ((.ty) <$> resp.fields)))|]] - resourceType :: RequestCreateResource -> Q Type - resourceType RequestCreateChannel = [t|Channel|] - resourceType (RequestCreateStream up down) = [t|Stream $up $down|] clientRequestStub' :: Name -> Name -> [Name] -> Q [Q Dec] clientRequestStub' clientStubPrimeName clientVarName argNames = do @@ -271,19 +267,27 @@ makeServer api@RpcApi{functions} code = sequence [protocolImplDec, logicInstance mainClause :: Q Clause mainClause = do channelNames <- sequence $ newName . ("channel" <>) . show <$> [0 .. (numPipelinedChannels req - 1)] + resourceNames <- sequence $ (\(res, num) -> newName (resourceNamePrefix res <> show num)) <$> zip req.createdResources [0 .. (numPipelinedChannels req - 1)] + handlerName <- newName "handler" fieldNames <- sequence $ newName . (.name) <$> req.fields let requestConP = conP (requestConName api req) (varP <$> fieldNames) ctx = RequestHandlerContext { implRecordE = varE implRecordName, - argumentEs = (varE <$> fieldNames), - channelEs = (varE <$> channelNames) + argumentEs = varE <$> fieldNames, + resourceEs = varE <$> resourceNames } + resourceEs = uncurry createResource <$> zip req.createdResources (varE <$> channelNames) clause [requestConP, listP (varP <$> channelNames)] - (normalB (packResponse req.mResponse (req.handlerE ctx))) - [] + (normalB (packResponse req.mResponse (applyResources resourceEs (varE handlerName)))) + [handlerDec handlerName resourceNames ctx] + + handlerDec :: Name -> [Name] -> RequestHandlerContext -> Q Dec + handlerDec handlerName resourceNames ctx = funD handlerName [clause (varP <$> resourceNames) (normalB (req.handlerE ctx)) []] + applyResources :: [Q Exp] -> Q Exp -> Q Exp + applyResources resourceEs implE = varE 'join `appE` applyM implE resourceEs invalidChannelCountClause :: Q Clause invalidChannelCountClause = do @@ -345,7 +349,7 @@ toField x = Field { name = x.name, ty = x.ty } data RequestHandlerContext = RequestHandlerContext { implRecordE :: Q Exp, argumentEs :: [Q Exp], - channelEs :: [Q Exp] + resourceEs :: [Q Exp] } @@ -364,7 +368,7 @@ generateObservable api observable = pure Code { fields = [], createdResources = [RequestCreateStream [t|Void|] observable.ty], mResponse = Nothing, - handlerE = \ctx -> [|undefined|] + handlerE = \ctx -> [|observeToStream $(observableE ctx) $(ctx.resourceEs !! 0)|] } retrieveRequest :: Request retrieveRequest = Request { @@ -386,8 +390,14 @@ generateObservable api observable = pure Code { serverImplFieldSig = [t|Observable $(observable.ty)|] observableE :: RequestHandlerContext -> Q Exp observableE ctx = [|$(varE serverImplFieldName) $(ctx.implRecordE)|] - --observeE :: Q Exp - --observeE = r + observableStubDec :: [Q Dec] + observableStubDec = [ + sigD (mkName observable.name) [t|$(clientType api) -> Observable $(observable.ty)|] + ] + observeE :: Q Exp + observeE = clientRequestStubE api observeRequest + retrieveE :: Q Exp + retrieveE = clientRequestStubE api retrieveRequest generateFunction :: RpcApi -> RpcFunction -> Q Code generateFunction api fun = do @@ -427,7 +437,7 @@ generateFunction api fun = do serverStreamTypes = sequence $ (\stream -> [t|Stream $(stream.tyDown) $(stream.tyUp)|]) <$> fun.streams serverRequestHandlerE :: RequestHandlerContext -> Q Exp - serverRequestHandlerE ctx = applyChannels (applyArgs (implFieldE ctx.implRecordE)) ctx.channelEs + serverRequestHandlerE ctx = applyResources (applyArgs (implFieldE ctx.implRecordE)) ctx.resourceEs where implFieldE :: Q Exp -> Q Exp implFieldE implRecordE = case fun.fixedHandler of @@ -435,16 +445,12 @@ generateFunction api fun = do Just handler -> [|$(handler) :: $implSig|] applyArgs :: Q Exp -> Q Exp applyArgs implE = foldl appE implE ctx.argumentEs - applyChannels :: Q Exp -> [Q Exp] -> Q Exp - applyChannels implE channelsEs = varE 'join `appE` applyM implE (createStream <$> channelsEs) - where - createStream :: Q Exp -> Q Exp - createStream = (varE 'newStream `appE`) + applyResources :: Q Exp -> [Q Exp] -> Q Exp + applyResources implE resourceEs = foldl appE implE resourceEs clientFunctionStub :: Q [Q Dec] clientFunctionStub = do funArgTypes <- functionArgumentTypes fun - clientType <- [t|Client $(protocolType api)|] pure [ sigD funName (clientRequestStubSig api request), funD funName [clause [] (normalB (clientRequestStubE api request)) []] @@ -466,8 +472,6 @@ hasResult fun = not (null fun.results) numPipelinedChannels :: Request -> Int numPipelinedChannels req = length req.createdResources --- ** Name helper functions - protocolTypeName :: RpcApi -> Name protocolTypeName RpcApi{name} = mkName (name <> "Protocol") @@ -495,6 +499,9 @@ responseFunctionCtorName api fun = mkName (responseTypeIdentifier api <> "_" <> responseConName :: RpcApi -> Response -> Name responseConName api resp = mkName (responseTypeIdentifier api <> "_" <> resp.name) +clientType :: RpcApi -> Q Type +clientType api = [t|Client $(protocolType api)|] + implTypeName :: RpcApi -> Name implTypeName RpcApi{name} = mkName $ name <> "ProtocolImpl" @@ -511,10 +518,7 @@ clientRequestStubE :: RpcApi -> Request -> Q Exp clientRequestStubE api req = (varE (clientRequestStubName api req)) clientRequestStubSig :: RpcApi -> Request -> Q Type -clientRequestStubSig api req = do - reqFieldTypes <- sequence $ (.ty) <$> req.fields - clientType <- [t|Client $(protocolType api)|] - makeStubSig (pure (clientType : reqFieldTypes)) +clientRequestStubSig api req = makeStubSig (sequence ((clientType api) : ((.ty) <$> req.fields))) where makeStubSig :: Q [Type] -> Q Type makeStubSig arguments = @@ -532,6 +536,21 @@ clientRequestStubSig api req = do clientRequestStubSigDec :: RpcApi -> Request -> Q Dec clientRequestStubSigDec api req = sigD (clientRequestStubName api req) (clientRequestStubSig api req) +resourceTypes :: Request -> Q [Type] +resourceTypes req = sequence $ resourceType <$> req.createdResources + +resourceType :: RequestCreateResource -> Q Type +resourceType RequestCreateChannel = [t|Channel|] +resourceType (RequestCreateStream up down) = [t|Stream $up $down|] + +resourceNamePrefix :: RequestCreateResource -> String +resourceNamePrefix RequestCreateChannel = "channel" +resourceNamePrefix (RequestCreateStream _ _) = "stream" + +createResource :: RequestCreateResource -> Q Exp -> Q Exp +createResource RequestCreateChannel channelE = [|pure $channelE|] +createResource (RequestCreateStream up down) channelE = [|newStream $channelE|] + -- * Template Haskell helper functions funT :: Q Type -> Q Type -> Q Type