Skip to content
Snippets Groups Projects
Commit 7e01fd0c authored by Jens Nolte's avatar Jens Nolte
Browse files

Update quasar and change affected code

parent b18edec9
No related branches found
No related tags found
No related merge requests found
...@@ -21,11 +21,11 @@ ...@@ -21,11 +21,11 @@
}, },
"locked": { "locked": {
"host": "git.c3pb.de", "host": "git.c3pb.de",
"lastModified": 1629328844, "lastModified": 1630286998,
"narHash": "sha256-4AiAjuSuTneWidipxxTu60xqztbum8OuTuKI9pWpUsQ=", "narHash": "sha256-2rGt7QPAHXuHP8mn/vxzXJotrGcv93jyQTZTPrwZ1w8=",
"owner": "jens", "owner": "jens",
"repo": "quasar", "repo": "quasar",
"rev": "67efa0e198ac78aaaf308c2067019574548acf7c", "rev": "499e025611d10540dbebf0e2732a0316a6e111bd",
"type": "gitlab" "type": "gitlab"
}, },
"original": { "original": {
......
...@@ -107,12 +107,17 @@ instance Exception MultiplexerException ...@@ -107,12 +107,17 @@ instance Exception MultiplexerException
data Channel = Channel { data Channel = Channel {
channelId :: ChannelId, channelId :: ChannelId,
resourceManager :: ResourceManager,
worker :: MultiplexerWorker, worker :: MultiplexerWorker,
stateMVar :: MVar ChannelState, stateMVar :: MVar ChannelState,
sendStateMVar :: MVar ChannelSendState, sendStateMVar :: MVar ChannelSendState,
receiveStateMVar :: MVar ChannelReceiveState, receiveStateMVar :: MVar ChannelReceiveState,
handlerAtVar :: AtVar InternalChannelMessageHandler handlerAtVar :: AtVar InternalChannelMessageHandler
} }
instance HasResourceManager Channel where
getResourceManager channel = channel.resourceManager
data ChannelState = ChannelState { data ChannelState = ChannelState {
connectionState :: ChannelConnectivity, connectionState :: ChannelConnectivity,
children :: [Channel] children :: [Channel]
......
...@@ -47,8 +47,9 @@ import Data.Binary (Binary, encode, decodeOrFail) ...@@ -47,8 +47,9 @@ import Data.Binary (Binary, encode, decodeOrFail)
import qualified Data.ByteString.Lazy as BSL import qualified Data.ByteString.Lazy as BSL
import qualified Data.HashMap.Strict as HM import qualified Data.HashMap.Strict as HM
import qualified Network.Socket as Socket import qualified Network.Socket as Socket
import Quasar.Async
import Quasar.Awaitable import Quasar.Awaitable
import Quasar.Core import Quasar.Disposable
import Quasar.Network.Connection import Quasar.Network.Connection
import Quasar.Network.Multiplexer import Quasar.Network.Multiplexer
import Quasar.Prelude import Quasar.Prelude
...@@ -65,7 +66,7 @@ type ProtocolResponseWrapper p = (MessageId, ProtocolResponse p) ...@@ -65,7 +66,7 @@ type ProtocolResponseWrapper p = (MessageId, ProtocolResponse p)
class RpcProtocol p => HasProtocolImpl p where class RpcProtocol p => HasProtocolImpl p where
type ProtocolImpl p type ProtocolImpl p
handleRequest :: HasResourceManager m => ProtocolImpl p -> Channel -> ProtocolRequest p -> [Channel] -> m (Maybe (Task (ProtocolResponse p))) handleRequest :: MonadAsync m => ProtocolImpl p -> Channel -> ProtocolRequest p -> [Channel] -> m (Maybe (Task (ProtocolResponse p)))
data Client p = Client { data Client p = Client {
...@@ -131,8 +132,7 @@ serverHandleChannelMessage protocolImpl channel resources msg = case decodeOrFai ...@@ -131,8 +132,7 @@ serverHandleChannelMessage protocolImpl channel resources msg = case decodeOrFai
where where
serverHandleChannelRequest :: [Channel] -> ProtocolRequest p -> IO () serverHandleChannelRequest :: [Channel] -> ProtocolRequest p -> IO ()
serverHandleChannelRequest channels req = do serverHandleChannelRequest channels req = do
-- TODO resource manager should belong to the current channel/api onResourceManager channel $
withDefaultResourceManager $
handleRequest @p protocolImpl channel req channels >>= \case handleRequest @p protocolImpl channel req channels >>= \case
Nothing -> pure () Nothing -> pure ()
Just task -> do Just task -> do
......
...@@ -5,8 +5,9 @@ module Quasar.Network.Runtime.Observable ( ...@@ -5,8 +5,9 @@ module Quasar.Network.Runtime.Observable (
) where ) where
import Data.Binary (Binary) import Data.Binary (Binary)
import Quasar.Async
import Quasar.Awaitable import Quasar.Awaitable
import Quasar.Core import Quasar.Disposable
import Quasar.Network.Exception import Quasar.Network.Exception
import Quasar.Network.Runtime import Quasar.Network.Runtime
import Quasar.Observable import Quasar.Observable
...@@ -44,11 +45,10 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv ...@@ -44,11 +45,10 @@ newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObserv
stream <- startObserveRequest stream <- startObserveRequest
streamSetHandler stream (callback . unpackObservableMessage) streamSetHandler stream (callback . unpackObservableMessage)
synchronousDisposable $ streamClose stream synchronousDisposable $ streamClose stream
retrieveFn :: forall m. HasResourceManager m => m (Task v) retrieveFn :: forall m. MonadAsync m => m (Task v)
retrieveFn = toTask <$> startRetrieveRequest retrieveFn = toTask <$> startRetrieveRequest
observeToStream :: (Binary v, HasResourceManager m) => Observable v -> Stream (PackedObservableMessage v) Void -> m () observeToStream :: (Binary v, MonadAsync m) => Observable v -> Stream (PackedObservableMessage v) Void -> m ()
observeToStream observable stream = do observeToStream observable stream =
_disposable <- liftIO $ observe observable $ streamSend stream . packObservableMessage asyncObserve_ observable \msg ->
-- TODO: dispose when the stream is closed streamSend stream $ packObservableMessage msg
pure ()
...@@ -24,8 +24,8 @@ import Data.Maybe (isJust, isNothing) ...@@ -24,8 +24,8 @@ import Data.Maybe (isJust, isNothing)
import GHC.Records.Compat (HasField) import GHC.Records.Compat (HasField)
import Language.Haskell.TH hiding (interruptible) import Language.Haskell.TH hiding (interruptible)
import Language.Haskell.TH.Syntax import Language.Haskell.TH.Syntax
import Quasar.Async
import Quasar.Awaitable import Quasar.Awaitable
import Quasar.Core
import Quasar.Network.Multiplexer import Quasar.Network.Multiplexer
import Quasar.Network.Runtime import Quasar.Network.Runtime
import Quasar.Network.Runtime.Observable import Quasar.Network.Runtime.Observable
...@@ -559,7 +559,7 @@ createResource RequestCreateChannel channelE = [|pure $channelE|] ...@@ -559,7 +559,7 @@ createResource RequestCreateChannel channelE = [|pure $channelE|]
createResource (RequestCreateStream up down) channelE = [|newStream $channelE|] createResource (RequestCreateStream up down) channelE = [|newStream $channelE|]
implResultType :: Request -> Q Type implResultType :: Request -> Q Type
implResultType req = [t|forall m. HasResourceManager m => m $(resultType)|] implResultType req = [t|forall m. MonadAsync m => m $(resultType)|]
where where
resultType = case req.mResponse of resultType = case req.mResponse of
Nothing -> [t|()|] Nothing -> [t|()|]
......
...@@ -16,8 +16,9 @@ import Control.Concurrent.STM ...@@ -16,8 +16,9 @@ import Control.Concurrent.STM
import Control.Exception (toException) import Control.Exception (toException)
import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.IO.Class (MonadIO, liftIO)
import Quasar.Prelude import Quasar.Prelude
import Quasar.Async
import Quasar.Awaitable import Quasar.Awaitable
import Quasar.Core import Quasar.Disposable
import Quasar.Network import Quasar.Network
import Quasar.Network.Runtime (withStandaloneClient) import Quasar.Network.Runtime (withStandaloneClient)
import Quasar.Network.TH (makeRpc) import Quasar.Network.TH (makeRpc)
...@@ -26,8 +27,6 @@ import Test.Hspec ...@@ -26,8 +27,6 @@ import Test.Hspec
import Test.QuickCheck import Test.QuickCheck
import Test.QuickCheck.Monadic import Test.QuickCheck.Monadic
shouldReturnAsync :: (HasCallStack, IsAwaitable r a, Show r, Eq r) => AsyncIO a -> r -> AsyncIO ()
action `shouldReturnAsync` expected = action >>= await >>= liftIO . (`shouldBe` expected)
$(makeRpc $ rpcApi "Example" $ do $(makeRpc $ rpcApi "Example" $ do
rpcFunction "fixedHandler42" $ do rpcFunction "fixedHandler42" $ do
...@@ -96,10 +95,10 @@ spec = parallel $ do ...@@ -96,10 +95,10 @@ spec = parallel $ do
describe "Example" $ do describe "Example" $ do
it "works" $ do it "works" $ do
withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do withStandaloneClient @ExampleProtocol exampleProtocolImpl $ \client -> do
(awaitIO =<< fixedHandler42 client 5) `shouldReturn` False (await =<< fixedHandler42 client 5) `shouldReturn` False
(awaitIO =<< fixedHandler42 client 42) `shouldReturn` True (await =<< fixedHandler42 client 42) `shouldReturn` True
(awaitIO =<< fixedHandlerInc client 41) `shouldReturn` 42 (await =<< fixedHandlerInc client 41) `shouldReturn` 42
(awaitIO =<< multiArgs client 10 3 False) `shouldReturn` (13, True) (await =<< multiArgs client 10 3 False) `shouldReturn` (13, True)
noResponse client 1337 noResponse client 1337
noNothing client noNothing client
...@@ -136,40 +135,45 @@ spec = parallel $ do ...@@ -136,40 +135,45 @@ spec = parallel $ do
var <- newObservableVar 41 var <- newObservableVar 41
withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
resultVar <- newTVarIO ObservableLoading resultVar <- newTVarIO ObservableLoading
observable <- intObservable client observable <- intObservable client
-- Change the value before calling `observe` -- Change the value before calling `observe`
setObservableVar var 42 setObservableVar var 42
void $ observe observable $ atomically . writeTVar resultVar withOnResourceManager do
join $ atomically $ readTVar resultVar >>= asyncObserve_ observable $ liftIO . atomically . writeTVar resultVar
\case
ObservableUpdate x -> pure $ x `shouldBe` 42 liftIO $ join $ atomically $ readTVar resultVar >>=
ObservableLoading -> retry \case
ObservableNotAvailable ex -> pure $ throwIO ex ObservableUpdate x -> pure $ x `shouldBe` 42
ObservableLoading -> retry
ObservableNotAvailable ex -> pure $ throwIO ex
it "receives continuous updates when observing" $ do it "receives continuous updates when observing" $ do
var <- newObservableVar 42 var <- newObservableVar 42
withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
resultVar <- newTVarIO ObservableLoading resultVar <- newTVarIO ObservableLoading
observable <- intObservable client observable <- intObservable client
void $ observe observable $ atomically . writeTVar resultVar withOnResourceManager do
let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= void $ asyncObserve observable $ liftIO . atomically . writeTVar resultVar
\case
-- Send and receive are running asynchronously, so this retries until the expected value is received. let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>=
-- Blocks forever if the wrong or no value is received. \case
ObservableUpdate x -> if (x == expected) then pure (pure ()) else retry -- Send and receive are running asynchronously, so this retries until the expected value is received.
ObservableLoading -> retry -- Blocks forever if the wrong or no value is received.
ObservableNotAvailable ex -> pure $ throwIO ex ObservableUpdate x -> if (x == expected) then pure (pure ()) else retry
ObservableLoading -> retry
latestShouldBe 42 ObservableNotAvailable ex -> pure $ throwIO ex
setObservableVar var 13
latestShouldBe 13 latestShouldBe 42
setObservableVar var (-1) setObservableVar var 13
latestShouldBe (-1) latestShouldBe 13
setObservableVar var 42 setObservableVar var (-1)
latestShouldBe 42 latestShouldBe (-1)
setObservableVar var 42
latestShouldBe 42
it "receives no further updates after disposing the callback registration" $ do it "receives no further updates after disposing the callback registration" $ do
pendingWith "not implemented" pendingWith "not implemented"
...@@ -178,27 +182,28 @@ spec = parallel $ do ...@@ -178,27 +182,28 @@ spec = parallel $ do
withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
resultVar <- newTVarIO ObservableLoading resultVar <- newTVarIO ObservableLoading
observable <- intObservable client observable <- intObservable client
disposable <- observe observable $ atomically . writeTVar resultVar withOnResourceManager do
let latestShouldBe = \expected -> join $ atomically $ readTVar resultVar >>= disposable <- asyncObserve observable $ liftIO . atomically . writeTVar resultVar
\case
-- Send and receive are running asynchronously, so this retries until the expected value is received. let latestShouldBe = \expected -> liftIO $ join $ atomically $ readTVar resultVar >>=
-- Blocks forever if the wrong or no value is received. \case
ObservableUpdate x -> if (x < 0) -- Send and receive are running asynchronously, so this retries until the expected value is received.
then pure (fail "received a message after unsubscribing") -- Blocks forever if the wrong or no value is received.
else if (x == expected) then pure (pure ()) else retry ObservableUpdate x -> if (x < 0)
ObservableLoading -> retry then pure (fail "received a message after unsubscribing")
ObservableNotAvailable ex -> pure $ throwIO ex else if (x == expected) then pure (pure ()) else retry
ObservableLoading -> retry
latestShouldBe 42 ObservableNotAvailable ex -> pure $ throwIO ex
setObservableVar var 13
latestShouldBe 13 latestShouldBe 42
setObservableVar var 42 setObservableVar var 13
latestShouldBe 42 latestShouldBe 13
setObservableVar var 42
disposeIO disposable latestShouldBe 42
setObservableVar var (-1) disposeAndAwait disposable
threadDelay 10000
setObservableVar var (-1)
latestShouldBe 42 liftIO $ threadDelay 10000
latestShouldBe 42
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment