From 6f15b972398f8a03a212bcc7f71d78e3caf4823b Mon Sep 17 00:00:00 2001 From: Jens Nolte <git@queezle.net> Date: Thu, 21 Apr 2022 22:27:31 +0200 Subject: [PATCH] Disable old Observable integration --- src/Quasar/Network/Multiplexer.hs | 2 +- src/Quasar/Network/Runtime/Observable.hs | 8 +- src/Quasar/Network/TH.hs | 98 ++++++------- test/Quasar/NetworkSpec.hs | 170 +++++++++++------------ 4 files changed, 139 insertions(+), 139 deletions(-) diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs index c8190df..3542d3e 100644 --- a/src/Quasar/Network/Multiplexer.hs +++ b/src/Quasar/Network/Multiplexer.hs @@ -509,7 +509,7 @@ receiveThread multiplexer readFn = do readBytes :: IO BS.ByteString readBytes = readFn `catchAll` \ex -> throwM $ ConnectionLost $ ReceiveFailed ex - -- | Reads and verifies magic bytes. Returns bytes left over from the received chunk(s). + -- Reads and verifies magic bytes. Returns bytes left over from the received chunk(s). checkMagicBytes :: IO BS.ByteString checkMagicBytes = checkMagicBytes' "" where diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs index e61f657..935bce8 100644 --- a/src/Quasar/Network/Runtime/Observable.hs +++ b/src/Quasar/Network/Runtime/Observable.hs @@ -1,8 +1,8 @@ module Quasar.Network.Runtime.Observable ( - PackedObservableState, - newObservableClient, - observeToStream, - callRetrieve, + --PackedObservableState, + --newObservableClient, + --observeToStream, + --callRetrieve, ) where import Data.Binary (Binary) diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs index 9249971..f9c5058 100644 --- a/src/Quasar/Network/TH.hs +++ b/src/Quasar/Network/TH.hs @@ -107,7 +107,7 @@ setFixedHandler handler = State.modify (\fun -> fun{fixedHandler = Just handler} -- | Generates rpc protocol types, rpc client and rpc server makeRpc :: RpcApi -> Q [Dec] makeRpc api = do - code <- mconcat <$> sequence ((generateFunction api <$> api.functions) <> (generateObservable api <$> api.observables)) + code <- mconcat <$> sequence ((generateFunction api <$> api.functions)) mconcat <$> sequence [makeProtocol api code, makeClient api code, makeServer api code] makeProtocol :: RpcApi -> Code -> Q [Dec] @@ -357,54 +357,54 @@ data RequestHandlerContext = RequestHandlerContext { -- * Rpc function code generator -generateObservable :: RpcApi -> RpcObservable -> Q Code -generateObservable api observable = pure Code { - clientStubDecs = observableStubDec, - requests = [observeRequest, retrieveRequest], - serverImplFields = [varDefaultBangType serverImplFieldName serverImplFieldSig] -} - where - observeRequest :: Request - observeRequest = Request { - name = observable.name <> "_observe", - fields = [], - createdResources = [RequestCreateStream [t|Void|] [t|PackedObservableState $(observable.ty)|]], - mResponse = Nothing, - handlerE = \ctx -> [|observeToStream $(observableE ctx) $(ctx.resourceEs !! 0)|] - } - retrieveRequest :: Request - retrieveRequest = Request { - name = observable.name <> "_retrieve", - fields = [], - createdResources = [], - mResponse = Just retrieveResponse, - handlerE = \ctx -> [|callRetrieve $(observableE ctx)|] - } - retrieveResponse :: Response - retrieveResponse = Response { - name = observable.name <> "_retrieve", - fields = [Field "result" observable.ty] - } - serverImplFieldName :: Name - serverImplFieldName = mkName (observable.name <> "Impl") - serverImplFieldSig :: Q Type - serverImplFieldSig = [t|Observable $(observable.ty)|] - observableE :: RequestHandlerContext -> Q Exp - observableE ctx = [|$(varE serverImplFieldName) $(ctx.implRecordE)|] - observableStubDec :: [Q Dec] - observableStubDec = [ - sigD (mkName observable.name) [t|$(clientType api) -> QuasarIO (Observable $(observable.ty))|], - do - clientName <- newName "client" - let clientE = varE clientName - funD (mkName observable.name) [ - clause [varP clientName] (normalB [|newObservableClient ($retrieveE $clientE) ($observeE $clientE)|]) [] - ] - ] - observeE :: Q Exp - observeE = clientRequestStubE api observeRequest - retrieveE :: Q Exp - retrieveE = clientRequestStubE api retrieveRequest +--generateObservable :: RpcApi -> RpcObservable -> Q Code +--generateObservable api observable = pure Code { +-- clientStubDecs = observableStubDec, +-- requests = [observeRequest, retrieveRequest], +-- serverImplFields = [varDefaultBangType serverImplFieldName serverImplFieldSig] +--} +-- where +-- observeRequest :: Request +-- observeRequest = Request { +-- name = observable.name <> "_observe", +-- fields = [], +-- createdResources = [RequestCreateStream [t|Void|] [t|PackedObservableState $(observable.ty)|]], +-- mResponse = Nothing, +-- handlerE = \ctx -> [|observeToStream $(observableE ctx) $(ctx.resourceEs !! 0)|] +-- } +-- retrieveRequest :: Request +-- retrieveRequest = Request { +-- name = observable.name <> "_retrieve", +-- fields = [], +-- createdResources = [], +-- mResponse = Just retrieveResponse, +-- handlerE = \ctx -> [|callRetrieve $(observableE ctx)|] +-- } +-- retrieveResponse :: Response +-- retrieveResponse = Response { +-- name = observable.name <> "_retrieve", +-- fields = [Field "result" observable.ty] +-- } +-- serverImplFieldName :: Name +-- serverImplFieldName = mkName (observable.name <> "Impl") +-- serverImplFieldSig :: Q Type +-- serverImplFieldSig = [t|Observable $(observable.ty)|] +-- observableE :: RequestHandlerContext -> Q Exp +-- observableE ctx = [|$(varE serverImplFieldName) $(ctx.implRecordE)|] +-- observableStubDec :: [Q Dec] +-- observableStubDec = [ +-- sigD (mkName observable.name) [t|$(clientType api) -> QuasarIO (Observable $(observable.ty))|], +-- do +-- clientName <- newName "client" +-- let clientE = varE clientName +-- funD (mkName observable.name) [ +-- clause [varP clientName] (normalB [|newObservableClient ($retrieveE $clientE) ($observeE $clientE)|]) [] +-- ] +-- ] +-- observeE :: Q Exp +-- observeE = clientRequestStubE api observeRequest +-- retrieveE :: Q Exp +-- retrieveE = clientRequestStubE api retrieveRequest generateFunction :: RpcApi -> RpcFunction -> Q Code generateFunction api fun = do diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs index c6ae81f..b07ee71 100644 --- a/test/Quasar/NetworkSpec.hs +++ b/test/Quasar/NetworkSpec.hs @@ -70,9 +70,9 @@ $(makeRpc $ rpcApi "StreamExample" $ do addStream "stream2" [t|Int|] [t|Int|] ) -$(makeRpc $ rpcApi "ObservableExample" $ do - rpcObservable "intObservable" [t|Int|] - ) +-- $(makeRpc $ rpcApi "ObservableExample" $ do +-- rpcObservable "intObservable" [t|Int|] +-- ) exampleProtocolImpl :: ExampleProtocolImpl exampleProtocolImpl = ExampleProtocolImpl { @@ -129,85 +129,85 @@ spec = parallel $ do liftIO $ streamSend stream (x, y) liftIO $ takeMVar resultMVar `shouldReturn` x * y - describe "ObservableExample" $ do - it "can retrieve values" $ rm do - var <- newObservableVarIO 42 - withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do - observable <- intObservable client - retrieve observable `shouldReturn` 42 - atomically $ setObservableVar var 13 - retrieve observable `shouldReturn` 13 - - it "receives the current value when calling observe" $ rm do - var <- newObservableVarIO 41 - - withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do - - resultVar <- newTVarIO ObservableLoading - observable <- intObservable client - - -- Change the value before calling `observe` - atomically $ setObservableVar var 42 - - observeIO_ observable $ \msg -> writeTVar resultVar msg - - liftIO $ join $ atomically $ readTVar resultVar >>= - \case - ObservableValue x -> pure $ x `shouldBe` 42 - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex - - it "receives continuous updates when observing" $ rm do - var <- newObservableVarIO 42 - withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do - resultVar <- newTVarIO ObservableLoading - observable <- intObservable client - - observeIO_ observable $ \msg -> writeTVar resultVar msg - - let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= - \case - -- Send and receive are running asynchronously, so this retries until the expected value is received. - -- Blocks forever if the wrong or no value is received. - ObservableValue x -> if x == expected then pure (pure ()) else retry - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex - - latestShouldBe 42 - atomically $ setObservableVar var 13 - latestShouldBe 13 - atomically $ setObservableVar var (-1) - latestShouldBe (-1) - atomically $ setObservableVar var 42 - latestShouldBe 42 - - it "receives no further updates after unsubscribing" $ rm do - var <- newObservableVarIO 42 - withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do - resultVar <- newTVarIO ObservableLoading - observable <- intObservable client - - disposer <- observeIO observable $ \msg -> writeTVar resultVar msg - - let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= - \case - -- Send and receive are running asynchronously, so this retries until the expected value is received. - -- Blocks forever if the wrong or no value is received. - ObservableValue x -> if x < 0 - then pure (fail "received a message after unsubscribing") - else if x == expected then pure (pure ()) else retry - ObservableLoading -> retry - ObservableNotAvailable ex -> pure $ throwIO ex - - latestShouldBe 42 - atomically $ setObservableVar var 13 - latestShouldBe 13 - atomically $ setObservableVar var 42 - latestShouldBe 42 - - dispose disposer - - atomically $ setObservableVar var (-1) - liftIO $ threadDelay 10000 - - latestShouldBe 42 +-- describe "ObservableExample" $ do +-- it "can retrieve values" $ rm do +-- var <- newObservableVarIO 42 +-- withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do +-- observable <- intObservable client +-- retrieve observable `shouldReturn` 42 +-- atomically $ setObservableVar var 13 +-- retrieve observable `shouldReturn` 13 +-- +-- it "receives the current value when calling observe" $ rm do +-- var <- newObservableVarIO 41 +-- +-- withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do +-- +-- resultVar <- newTVarIO ObservableLoading +-- observable <- intObservable client +-- +-- -- Change the value before calling `observe` +-- atomically $ setObservableVar var 42 +-- +-- observeIO_ observable $ \msg -> writeTVar resultVar msg +-- +-- liftIO $ join $ atomically $ readTVar resultVar >>= +-- \case +-- ObservableValue x -> pure $ x `shouldBe` 42 +-- ObservableLoading -> retry +-- ObservableNotAvailable ex -> pure $ throwIO ex +-- +-- it "receives continuous updates when observing" $ rm do +-- var <- newObservableVarIO 42 +-- withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do +-- resultVar <- newTVarIO ObservableLoading +-- observable <- intObservable client +-- +-- observeIO_ observable $ \msg -> writeTVar resultVar msg +-- +-- let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= +-- \case +-- -- Send and receive are running asynchronously, so this retries until the expected value is received. +-- -- Blocks forever if the wrong or no value is received. +-- ObservableValue x -> if x == expected then pure (pure ()) else retry +-- ObservableLoading -> retry +-- ObservableNotAvailable ex -> pure $ throwIO ex +-- +-- latestShouldBe 42 +-- atomically $ setObservableVar var 13 +-- latestShouldBe 13 +-- atomically $ setObservableVar var (-1) +-- latestShouldBe (-1) +-- atomically $ setObservableVar var 42 +-- latestShouldBe 42 +-- +-- it "receives no further updates after unsubscribing" $ rm do +-- var <- newObservableVarIO 42 +-- withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do +-- resultVar <- newTVarIO ObservableLoading +-- observable <- intObservable client +-- +-- disposer <- observeIO observable $ \msg -> writeTVar resultVar msg +-- +-- let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>= +-- \case +-- -- Send and receive are running asynchronously, so this retries until the expected value is received. +-- -- Blocks forever if the wrong or no value is received. +-- ObservableValue x -> if x < 0 +-- then pure (fail "received a message after unsubscribing") +-- else if x == expected then pure (pure ()) else retry +-- ObservableLoading -> retry +-- ObservableNotAvailable ex -> pure $ throwIO ex +-- +-- latestShouldBe 42 +-- atomically $ setObservableVar var 13 +-- latestShouldBe 13 +-- atomically $ setObservableVar var 42 +-- latestShouldBe 42 +-- +-- dispose disposer +-- +-- atomically $ setObservableVar var (-1) +-- liftIO $ threadDelay 10000 +-- +-- latestShouldBe 42 -- GitLab