-- For rpc: {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE EmptyDataDeriving #-} -- Print generated rpc code during build {-# OPTIONS_GHC -ddump-splices #-} module Quasar.NetworkSpec where import Control.Concurrent (threadDelay) import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Exception (toException) import Control.Monad.IO.Class (MonadIO, liftIO) import Quasar.Prelude import Quasar.Awaitable import Quasar.Core import Quasar.Network import Quasar.Network.Runtime (withStandaloneClient) import Quasar.Network.TH (makeRpc) import Quasar.Observable import Test.Hspec import Test.QuickCheck import Test.QuickCheck.Monadic shouldReturnAsync :: (HasCallStack, IsAwaitable r a, Show r, Eq r) => AsyncIO a -> r -> AsyncIO () action `shouldReturnAsync` expected = action >>= await >>= liftIO . (`shouldBe` expected) $(makeRpc $ rpcApi "Example" $ do rpcFunction "fixedHandler42" $ do addArgument "arg" [t|Int|] addResult "result" [t|Bool|] setFixedHandler [| pure . pure . (== 42) |] rpcFunction "fixedHandlerInc" $ do addArgument "arg" [t|Int|] addResult "result" [t|Int|] setFixedHandler [| pure . pure . (+ 1) |] rpcFunction "multiArgs" $ do addArgument "one" [t|Int|] addArgument "two" [t|Int|] addArgument "three" [t|Bool|] addResult "result" [t|Int|] addResult "result2" [t|Bool|] rpcFunction "noArgs" $ do addResult "result" [t|Int|] rpcFunction "noResponse" $ do addArgument "arg" [t|Int|] rpcFunction "noNothing" $ pure () ) $(makeRpc $ rpcApi "StreamExample" $ do rpcFunction "createMultiplyStream" $ do addStream "stream" [t|(Int, Int)|] [t|Int|] rpcFunction "createStreams" $ do addStream "stream1" [t|Bool|] [t|Bool|] addStream "stream2" [t|Int|] [t|Int|] ) $(makeRpc $ rpcApi "ObservableExample" $ do rpcObservable "intObservable" [t|Int|] ) exampleProtocolImpl :: ExampleProtocolImpl exampleProtocolImpl = ExampleProtocolImpl { multiArgsImpl = \one two three -> pure $ pure (one + two, not three), noArgsImpl = pure $ pure 42, noResponseImpl = \_foo -> pure (), noNothingImpl = pure () } streamExampleProtocolImpl :: StreamExampleProtocolImpl streamExampleProtocolImpl = StreamExampleProtocolImpl { createMultiplyStreamImpl, createStreamsImpl } where createMultiplyStreamImpl :: MonadIO m => Stream Int (Int, Int) -> m () createMultiplyStreamImpl stream = streamSetHandler stream $ \(x, y) -> streamSend stream (x * y) createStreamsImpl :: MonadIO m => Stream Bool Bool -> Stream Int Int -> m () createStreamsImpl stream1 stream2 = do streamSetHandler stream1 $ streamSend stream1 streamSetHandler stream2 $ streamSend stream2 spec :: Spec spec = parallel $ do describe "Example" $ do it "works" $ do withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do (awaitIO =<< fixedHandler42 client 5) `shouldReturn` False (awaitIO =<< fixedHandler42 client 42) `shouldReturn` True (awaitIO =<< fixedHandlerInc client 41) `shouldReturn` 42 (awaitIO =<< multiArgs client 10 3 False) `shouldReturn` (13, True) noResponse client 1337 noNothing client describe "StreamExample" $ do it "can open and close a stream" $ do withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do streamClose =<< createMultiplyStream client it "can open multiple streams in a single rpc call" $ do withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do (stream1, stream2) <- createStreams client streamClose stream1 streamClose stream2 aroundAll (\x -> withStandaloneClient @StreamExampleProtocol streamExampleProtocolImpl $ \client -> do resultMVar <- liftIO newEmptyMVar stream <- createMultiplyStream client streamSetHandler stream $ putMVar resultMVar liftIO $ x (resultMVar, stream) ) $ it "can send data over the stream" $ \(resultMVar, stream) -> property $ \(x, y) -> monadicIO $ do liftIO $ streamSend stream (x, y) liftIO $ takeMVar resultMVar `shouldReturn` x * y describe "ObservableExample" $ do it "can retrieve values" $ do var <- newObservableVar 42 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do observable <- intObservable client retrieveIO observable `shouldReturn` 42 setObservableVar var 13 retrieveIO observable `shouldReturn` 13 it "receives the current value when calling observe" $ do var <- newObservableVar 41 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading observable <- intObservable client -- Change the value before calling `observe` setObservableVar var 42 void $ observe observable $ atomically . writeTVar resultVar join $ atomically $ readTVar resultVar >>= \case ObservableUpdate x -> pure $ x `shouldBe` 42 ObservableLoading -> retry ObservableNotAvailable ex -> pure $ throwIO ex it "receives continuous updates when observing" $ do var <- newObservableVar 42 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading observable <- intObservable client void $ observe observable $ atomically . writeTVar resultVar let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= \case -- Send and receive are running asynchronously, so this retries until the expected value is received. -- Blocks forever if the wrong or no value is received. ObservableUpdate x -> if (x == expected) then pure (pure ()) else retry ObservableLoading -> retry ObservableNotAvailable ex -> pure $ throwIO ex latestShouldBe 42 setObservableVar var 13 latestShouldBe 13 setObservableVar var (-1) latestShouldBe (-1) setObservableVar var 42 latestShouldBe 42 it "receives no further updates after disposing the callback registration" $ do var <- newObservableVar 42 withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do resultVar <- newTVarIO ObservableLoading observable <- intObservable client disposable <- observe observable $ atomically . writeTVar resultVar let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= \case -- Send and receive are running asynchronously, so this retries until the expected value is received. -- Blocks forever if the wrong or no value is received. ObservableUpdate x -> if (x < 0) then pure (fail "received a message after unsubscribing") else if (x == expected) then pure (pure ()) else retry ObservableLoading -> retry ObservableNotAvailable ex -> pure $ throwIO ex latestShouldBe 42 setObservableVar var 13 latestShouldBe 13 setObservableVar var 42 latestShouldBe 42 disposeIO disposable setObservableVar var (-1) threadDelay 10000 latestShouldBe 42