From 6c2d0f3bd58b03677a592a720a890a1b3937659b Mon Sep 17 00:00:00 2001
From: Jens Nolte <git@queezle.net>
Date: Wed, 6 Apr 2022 18:04:31 +0200
Subject: [PATCH] Reimplement observable (completes quasar update)

---
 flake.lock                               |   6 +-
 quasar-network.cabal                     |   3 +-
 src/Quasar/Network/Multiplexer.hs        |  65 +++++++---
 src/Quasar/Network/Runtime.hs            |  17 ++-
 src/Quasar/Network/Runtime/Observable.hs | 149 ++++++++++++++++-------
 src/Quasar/Network/TH.hs                 |   4 +-
 test/Quasar/Network/MultiplexerSpec.hs   |   3 +-
 test/Quasar/NetworkSpec.hs               |  16 +--
 8 files changed, 180 insertions(+), 83 deletions(-)

diff --git a/flake.lock b/flake.lock
index cee4fbb..d7a23bb 100644
--- a/flake.lock
+++ b/flake.lock
@@ -34,11 +34,11 @@
       },
       "locked": {
         "host": "git.c3pb.de",
-        "lastModified": 1649109956,
-        "narHash": "sha256-kPlVpOJPlSojhPY/SMkhbBHrJ/NdDi3fbAbGsAZS3II=",
+        "lastModified": 1649186839,
+        "narHash": "sha256-hYmuv40Wy302gWHvsEb8FGmMX1fxZlpq77SLE7UxQJQ=",
         "owner": "jens",
         "repo": "quasar",
-        "rev": "f85f15178852c991dd6c911d8a02a886dc716271",
+        "rev": "82ebde4fe07002ba7c7a4d5bb4e21d63f30d4eef",
         "type": "gitlab"
       },
       "original": {
diff --git a/quasar-network.cabal b/quasar-network.cabal
index 9ed6776..d40329c 100644
--- a/quasar-network.cabal
+++ b/quasar-network.cabal
@@ -88,9 +88,10 @@ library
     Quasar.Network.Exception
     Quasar.Network.Multiplexer
     Quasar.Network.Runtime
-    Quasar.Network.Runtime.Observable
     Quasar.Network.SocketLocation
     Quasar.Network.TH
+  other-modules:
+    Quasar.Network.Runtime.Observable
   build-depends:
     base >=4.7 && <5,
     async,
diff --git a/src/Quasar/Network/Multiplexer.hs b/src/Quasar/Network/Multiplexer.hs
index 7c6ee92..0cb28f5 100644
--- a/src/Quasar/Network/Multiplexer.hs
+++ b/src/Quasar/Network/Multiplexer.hs
@@ -11,9 +11,12 @@ module Quasar.Network.Multiplexer (
   defaultMessageConfiguration,
   channelSend,
   sendChannelMessage,
+  sendChannelMessageDeferred,
   unsafeQueueChannelMessage,
+  unsafeQueueChannelMessageSimple,
   channelSend_,
   channelSendSimple,
+  channelSendSimpleDeferred,
 
   -- ** Receiving messages
   ReceivedMessageResources(..),
@@ -188,7 +191,7 @@ newRootChannel multiplexer = do
       sentCloseMessage
     }
 
-  registerDisposeActionIO_ $ atomically $ sendChannelCloseMessage channel
+  registerDisposeTransactionIO_ $ sendChannelCloseMessage channel
 
   pure channel
 
@@ -371,8 +374,10 @@ newMultiplexerInternal side connection = disposeOnError do
         when (isNothing r) do
           fulfillPromise multiplexerException $ ConnectionLost CloseTimeoutReached
 
-        dispose sendTask
-        dispose receiveTask
+        sf <- disposeEventuallyIO sendTask
+        rf <- disposeEventuallyIO receiveTask
+        await $ sf <> rf
+
         connection.close
 
         fulfillPromise multiplexerResult =<< peekFuture (toFuture multiplexerException)
@@ -445,10 +450,10 @@ sendThread multiplexer sendFn = do
               -- Exit when the receive thread has stopped and there is no error and no message left to send
               [] -> pure () <$ awaitSTM multiplexer.receiveThreadCompleted
               _ -> pure do
-                bs <- execWriterT do
+                bytes <- execWriterT do
                   -- outbox is a list that is used as a queue, so it has to be reversed to preserve the correct order
                   mapM_ formatMessage (reverse messages)
-                liftIO $ send bs
+                liftIO $ send bytes
                 sendLoop
     send :: MonadIO m => Put -> m ()
     send chunks = liftIO $ sendFn (Binary.runPut chunks) `catchAll` (throwM . ConnectionLost . SendFailed)
@@ -491,12 +496,12 @@ receiveThread :: Multiplexer -> IO BS.ByteString -> IO ()
 receiveThread multiplexer readFn = do
   rootChannel <- lookupChannel 0
   chunk <- case multiplexer.side of
-    MultiplexerSideA -> read
+    MultiplexerSideA -> readBytes
     MultiplexerSideB -> checkMagicBytes
   evalStateT (multiplexerLoop rootChannel) chunk
   where
-    read :: IO BS.ByteString
-    read = readFn `catchAll` \ex -> throwM $ ConnectionLost $ ReceiveFailed ex
+    readBytes :: IO BS.ByteString
+    readBytes = readFn `catchAll` \ex -> throwM $ ConnectionLost $ ReceiveFailed ex
 
     -- | Reads and verifies magic bytes. Returns bytes left over from the received chunk(s).
     checkMagicBytes :: IO BS.ByteString
@@ -505,7 +510,7 @@ receiveThread multiplexer readFn = do
         magicBytesLength = BS.length magicBytes
         checkMagicBytes' :: BS.ByteString -> IO BS.ByteString
         checkMagicBytes' chunk@((< magicBytesLength) . BS.length -> True) = do
-          next <- read `catchAll` \_ -> throwM (InvalidMagicBytes chunk)
+          next <- readBytes `catchAll` \_ -> throwM (InvalidMagicBytes chunk)
           checkMagicBytes' $ chunk <> next
         checkMagicBytes' (BS.splitAt magicBytesLength -> (bytes, leftovers)) = do
           when (bytes /= magicBytes) $ throwM $ InvalidMagicBytes bytes
@@ -529,7 +534,7 @@ receiveThread multiplexer readFn = do
       where
         stepDecoder :: Decoder a -> IO (a, BS.ByteString)
         stepDecoder (Fail _ _ errMsg) = errorHandler errMsg
-        stepDecoder (Partial feedFn) = stepDecoder . feedFn . Just =<< read
+        stepDecoder (Partial feedFn) = stepDecoder . feedFn . Just =<< readBytes
         stepDecoder (Done leftovers _ msg) = pure (msg, leftovers)
 
     getMultiplexerMessage :: StateT BS.ByteString IO MultiplexerMessage
@@ -538,7 +543,7 @@ receiveThread multiplexer readFn = do
         protocolException $ "Failed to parse protocol message: " <> errMsg
 
     execReceivedMultiplexerMessage :: ReceiveThreadState -> MultiplexerMessage -> StateT BS.ByteString IO (Maybe ReceiveThreadState)
-    execReceivedMultiplexerMessage Nothing (ChannelMessage _ messageLength) = undefined
+    execReceivedMultiplexerMessage Nothing (ChannelMessage _ _) = undefined
     execReceivedMultiplexerMessage state@(Just channel) (ChannelMessage newChannelCount messageLength) = do
       join $ liftIO $ atomically do
         closedByRemote <- readTVar channel.receivedCloseMessage
@@ -561,7 +566,7 @@ receiveThread multiplexer readFn = do
         Nothing -> protocolException $
           mconcat ["Failed to switch to channel ", show channelId, " (invalid id)"]
         Just channel -> do
-          receivedClose <- atomically (readTVar channel.receivedCloseMessage)
+          receivedClose <- readTVarIO channel.receivedCloseMessage
           when receivedClose $ protocolException $
             mconcat ["Failed to switch to channel ", show channelId, " (channel is closed)"]
           pure (Just (Just channel))
@@ -602,7 +607,7 @@ receiveThread multiplexer readFn = do
         -- Signal to handler, that receiving is completed
         runHandler (InternalMessageHandler fn) 0 leftovers = leftovers <$ fn Nothing
         -- Read more data
-        runHandler handler remaining (BS.null -> True) = runHandler handler remaining =<< read
+        runHandler handler remaining (BS.null -> True) = runHandler handler remaining =<< readBytes
         -- Feed remaining data into handler
         runHandler (InternalMessageHandler fn) remaining chunk
           | chunkLength <= remaining = do
@@ -621,7 +626,7 @@ receiveThread multiplexer readFn = do
         go :: MessageLength -> BS.ByteString -> IO BS.ByteString
         go remaining chunk
           | chunkLength <= remaining = do
-            go (remaining - chunkLength) =<< read
+            go (remaining - chunkLength) =<< readBytes
           | otherwise = do
             pure $ BS.drop (fromIntegral remaining) chunk
           where
@@ -637,17 +642,33 @@ sendChannelMessage channel@Channel{multiplexer} messageConfiguration payload mes
   -- Locking the 'outboxGuard' guarantees fairness when sending messages concurrently (it also prevents unnecessary
   -- STM retries)
   withMVar multiplexer.outboxGuard \_ ->
-    atomically $ sendChannelMessageInternal BlockUntilReady channel messageConfiguration payload messageIdHook
+    atomically $ sendChannelMessageInternal BlockUntilReady channel messageConfiguration (pure payload) messageIdHook
+
+sendChannelMessageDeferred :: MonadIO m => Channel -> MessageConfiguration -> STM BSL.ByteString -> (MessageId -> STM ()) -> m SentMessageResources
+sendChannelMessageDeferred channel@Channel{multiplexer} messageConfiguration payloadHook messageIdHook = liftIO do
+  -- Locking the 'outboxGuard' guarantees fairness when sending messages concurrently (it also prevents unnecessary
+  -- STM retries)
+  withMVar multiplexer.outboxGuard \_ ->
+    atomically $ sendChannelMessageInternal BlockUntilReady channel messageConfiguration payloadHook messageIdHook
 
 -- | Unsafely queue a network message to an unbounded send queue. This function does not block, even if `sendChannelMessage` would block. Queued messages will cause concurrent or following `sendChannelMessage`-calls to block until the queue is flushed.
 unsafeQueueChannelMessage :: Channel -> MessageConfiguration -> BSL.ByteString -> (MessageId -> STM ()) -> STM SentMessageResources
-unsafeQueueChannelMessage = sendChannelMessageInternal UnboundedQueue
+unsafeQueueChannelMessage channel messageConfiguration payload =
+  sendChannelMessageInternal UnboundedQueue channel messageConfiguration (pure payload)
+
+unsafeQueueChannelMessageSimple :: MonadSTM m => Channel -> BSL.ByteString -> m ()
+unsafeQueueChannelMessageSimple channel msg = liftSTM do
+  unsafeQueueChannelMessage channel defaultMessageConfiguration msg (const (pure ())) >>=
+    \case
+      -- Pattern match verifies no channels are created due to a bug
+      SentMessageResources{createdChannels=[]} -> pure ()
+      _ -> unreachableCodePathM
 
 
 data QueueBehavior = BlockUntilReady | UnboundedQueue
 
-sendChannelMessageInternal :: QueueBehavior -> Channel -> MessageConfiguration -> BSL.ByteString -> (MessageId -> STM ()) -> STM SentMessageResources
-sendChannelMessageInternal queueBehavior channel@Channel{multiplexer} MessageConfiguration{closeChannel, createChannels} payload messageIdHook = do
+sendChannelMessageInternal :: QueueBehavior -> Channel -> MessageConfiguration -> STM BSL.ByteString -> (MessageId -> STM ()) -> STM SentMessageResources
+sendChannelMessageInternal queueBehavior channel@Channel{multiplexer} MessageConfiguration{closeChannel, createChannels} payloadHook messageIdHook = do
   -- NOTE At most one message can be queued per STM transaction, so `sendChannelMessage` cannot be changed to STM
 
   -- Abort if the multiplexer is finished or currently cleaning up
@@ -664,6 +685,8 @@ sendChannelMessageInternal queueBehavior channel@Channel{multiplexer} MessageCon
       check $ null msgs
     UnboundedQueue -> pure ()
 
+  payload <- payloadHook
+
   -- Put the message into the outbox. It will be picked up by the send thread.
   let msg = OutboxSendMessage channel.channelId createChannels payload
   writeTVar multiplexer.outbox (msg:msgs)
@@ -698,6 +721,12 @@ channelSendSimple channel msg = liftIO do
   SentMessageResources{createdChannels=[]} <- channelSend channel defaultMessageConfiguration msg (const (pure ()))
   pure ()
 
+channelSendSimpleDeferred :: MonadIO m => Channel -> STM BSL.ByteString -> m ()
+channelSendSimpleDeferred channel payloadHook = liftIO do
+  -- Pattern match verifies no channels are created due to a bug
+  SentMessageResources{createdChannels=[]} <- sendChannelMessageDeferred channel defaultMessageConfiguration payloadHook (const (pure ()))
+  pure ()
+
 channelReportProtocolError :: MonadIO m => Channel -> String -> m b
 channelReportProtocolError = undefined
 
diff --git a/src/Quasar/Network/Runtime.hs b/src/Quasar/Network/Runtime.hs
index f8661ff..a63379f 100644
--- a/src/Quasar/Network/Runtime.hs
+++ b/src/Quasar/Network/Runtime.hs
@@ -25,8 +25,10 @@ module Quasar.Network.Runtime (
   -- * Stream
   Stream,
   streamSend,
+  streamSendDeferred,
   streamSetHandler,
   streamQuasar,
+  unsafeQueueStreamMessage,
 
   -- * Test implementation
   withStandaloneClient,
@@ -79,7 +81,7 @@ clientSend client config req = liftIO $ channelSend_ client.channel config (enco
 clientRequest :: forall p m a. (MonadIO m, RpcProtocol p) => Client p -> (ProtocolResponse p -> Maybe a) -> MessageConfiguration -> ProtocolRequest p -> m (Future a, SentMessageResources)
 clientRequest client checkResponse config req = do
   resultPromise <- newPromise
-  sentMessageResources <- liftIO $ channelSend client.channel config (encode req) \msgId ->
+  sentMessageResources <- liftIO $ sendChannelMessage client.channel config (encode req) \msgId ->
     modifyTVar client.callbacksVar $ HM.insert msgId (requestCompletedCallback resultPromise msgId)
   pure (toFuture resultPromise, sentMessageResources)
   where
@@ -93,7 +95,7 @@ clientRequest client checkResponse config req = do
         Just result -> fulfillPromise resultPromise result
 
 -- TODO use new direct decoder api instead
-clientHandleChannelMessage :: forall p. RpcProtocol p => Client p -> ReceivedMessageResources -> ProtocolResponseWrapper p -> QuasarIO ()
+clientHandleChannelMessage :: Client p -> ReceivedMessageResources -> ProtocolResponseWrapper p -> QuasarIO ()
 clientHandleChannelMessage client resources (requestId, resp) = liftIO clientHandleResponse
   where
     clientHandleResponse :: IO ()
@@ -136,6 +138,13 @@ newStream = liftIO . pure . Stream
 streamSend :: (Binary up, MonadIO m) => Stream up down -> up -> m ()
 streamSend (Stream channel) value = liftIO $ channelSendSimple channel (encode value)
 
+streamSendDeferred :: (Binary up, MonadIO m) => Stream up down -> STM up -> m ()
+streamSendDeferred (Stream channel) value = liftIO $ channelSendSimpleDeferred channel (encode <$> value)
+
+unsafeQueueStreamMessage :: (Binary up, MonadSTM m) => Stream up down -> up -> m ()
+unsafeQueueStreamMessage (Stream channel) value = liftSTM do
+  unsafeQueueChannelMessageSimple channel (encode value)
+
 streamSetHandler :: (Binary down, MonadIO m) => Stream up down -> (down -> QuasarIO ()) -> m ()
 streamSetHandler (Stream channel) handler = liftIO $ channelSetSimpleBinaryHandler channel handler
 
@@ -307,9 +316,9 @@ connectToServer server connection =
     formatException (fromException -> Just (ConnectionLost (ReceiveFailed (fromException -> Just EOF)))) =
       mconcat ["Client connection lost (", connection.description, ")"]
     formatException (fromException -> Just (ConnectionLost ex)) =
-      mconcat ["Client connection lost (", connection.description, "): ", (displayException ex)]
+      mconcat ["Client connection lost (", connection.description, "): ", displayException ex]
     formatException ex =
-      mconcat ["Client exception (", connection.description, "): ", (displayException ex)]
+      mconcat ["Client exception (", connection.description, "): ", displayException ex]
 
     logUntilDone :: Future () -> TQueue String -> QuasarIO ()
     logUntilDone done messageQueue =
diff --git a/src/Quasar/Network/Runtime/Observable.hs b/src/Quasar/Network/Runtime/Observable.hs
index e972970..cce2d3a 100644
--- a/src/Quasar/Network/Runtime/Observable.hs
+++ b/src/Quasar/Network/Runtime/Observable.hs
@@ -1,65 +1,124 @@
 module Quasar.Network.Runtime.Observable (
-  PackedObservableMessage,
-  newObservableStub,
+  PackedObservableState,
+  newObservableClient,
   observeToStream,
+  callRetrieve,
 ) where
 
 import Data.Binary (Binary)
 import Control.Monad.Catch
-import Quasar.Async
-import Quasar.Awaitable
-import Quasar.Disposable
+import Quasar
 import Quasar.Network.Exception
 import Quasar.Network.Multiplexer
 import Quasar.Network.Runtime
-import Quasar.Observable
 import Quasar.Prelude
-import Quasar.ResourceManager
 
-data PackedObservableMessage v
-  = PackedObservableUpdate v
+data PackedObservableState a
+  = PackedObservableValue a
   | PackedObservableLoading
   | PackedObservableNotAvailable PackedException
   deriving stock (Eq, Show, Generic)
   deriving anyclass (Binary)
 
-packObservableMessage :: ObservableMessage r -> PackedObservableMessage r
-packObservableMessage (ObservableUpdate x) = PackedObservableUpdate x
-packObservableMessage (ObservableLoading) = PackedObservableLoading
-packObservableMessage (ObservableNotAvailable ex) = PackedObservableNotAvailable (packException ex)
-
-unpackObservableMessage :: PackedObservableMessage r -> ObservableMessage r
-unpackObservableMessage (PackedObservableUpdate x) = ObservableUpdate x
-unpackObservableMessage (PackedObservableLoading) = ObservableLoading
-unpackObservableMessage (PackedObservableNotAvailable ex) = ObservableNotAvailable (unpackException ex)
-
-newObservableStub
-  :: forall v. Binary v
-  => (forall m. MonadIO m => m (Awaitable v))
-  -> (forall m. MonadIO m => m (Stream Void (PackedObservableMessage v)))
-  -> IO (Observable v)
-newObservableStub startRetrieveRequest startObserveRequest = pure uncachedObservable -- TODO cache
+packObservableState :: ObservableState r -> PackedObservableState r
+packObservableState (ObservableValue x) = PackedObservableValue x
+packObservableState ObservableLoading = PackedObservableLoading
+packObservableState (ObservableNotAvailable ex) = PackedObservableNotAvailable (packException ex)
+
+unpackObservableState :: PackedObservableState r -> ObservableState r
+unpackObservableState (PackedObservableValue x) = ObservableValue x
+unpackObservableState PackedObservableLoading = ObservableLoading
+unpackObservableState (PackedObservableNotAvailable ex) = ObservableNotAvailable (unpackException ex)
+
+data ObservableClient a =
+  ObservableClient {
+    quasar :: Quasar,
+    beginRetrieve :: IO (Future a),
+    createObservableStream :: IO (Stream Void (PackedObservableState a)),
+    observablePrim :: ObservablePrim a,
+    activeStreamVar :: TVar (Maybe (Stream Void (PackedObservableState a)))
+  }
+
+instance IsRetrievable a (ObservableClient a) where
+  -- TODO use withResourceScope to abort on async exception (once supported by the code generator)
+  retrieve client = liftIO $ client.beginRetrieve >>= await
+
+instance IsObservable a (ObservableClient a) where
+  observe client callback = liftQuasarSTM do
+    sub <- observe client.observablePrim callback
+    -- Register to clients quasar as well to ensure observers are unsubscribed when the client thread is cancelled
+    runQuasarSTM client.quasar $ registerResource sub
+    pure sub
+  pingObservable = undefined
+
+
+-- | Should be used in generated code (not implemented yet). Has to be run in context of the parent streams `Quasar`.
+newObservableClient
+  :: forall a. Binary a
+  => IO (Future a)
+  -> IO (Stream Void (PackedObservableState a))
+  -> QuasarIO (Observable a)
+newObservableClient beginRetrieve createObservableStream = do
+  quasar <- askQuasar
+  observablePrim <- newObservablePrimIO ObservableLoading
+  activeStreamVar <- newTVarIO Nothing
+  let client = ObservableClient {
+    quasar,
+    beginRetrieve,
+    createObservableStream,
+    observablePrim,
+    activeStreamVar
+  }
+  async_ $ manageObservableClient client
+  pure $ toObservable client
+
+manageObservableClient :: Binary a => ObservableClient a -> QuasarIO ()
+manageObservableClient ObservableClient{createObservableStream, observablePrim, activeStreamVar} = mask_ $ forever do
+  atomically $ check =<< observablePrimHasObservers observablePrim
+
+  stream <- liftIO createObservableStream
+  streamSetHandler stream handler
+  atomically $ writeTVar activeStreamVar (Just stream)
+
+  atomically $ check . not =<< observablePrimHasObservers observablePrim
+  mapM_ dispose =<< atomically (swapTVar activeStreamVar Nothing)
+  atomically $ setObservablePrim observablePrim ObservableLoading
   where
-    uncachedObservable :: Observable v
-    uncachedObservable = fnObservable observeFn retrieveFn
-    observeFn :: (ObservableMessage v -> ResourceManagerIO ()) -> ResourceManagerIO ()
-    observeFn callback = do
-      callback ObservableLoading
-      registerNewResource_ do
-        -- TODO send updates about the connection status
-        stream <- startObserveRequest
-        resourceManager <- askResourceManager
-        -- TODO FIXME enterResourceManager may fail if the resource manager is disposing (before the stream is disposed)
-        streamSetHandler stream (enterResourceManager resourceManager . callback . unpackObservableMessage)
-        pure $ toDisposable stream
-    retrieveFn :: ResourceManagerIO (Awaitable v)
-    retrieveFn = startRetrieveRequest
-
-observeToStream :: (Binary v, MonadResourceManager m, MonadIO m, MonadMask m) => Observable v -> Stream (PackedObservableMessage v) Void -> m ()
+    handler (unpackObservableState -> state) = atomically $ setObservablePrim observablePrim state
+
+-- | Used in generated code to call `retrieve`.
+callRetrieve :: Observable a -> QuasarIO (Future a)
+-- TODO LATER rewrite `retrieve` to keep backpressure across multiple hops
+callRetrieve x = toFuture <$> async (retrieve x)
+
+-- | Used in generated code to call `observe`.
+observeToStream :: forall a. Binary a => Observable a -> Stream (PackedObservableState a) Void -> QuasarIO ()
 observeToStream observable stream = do
-  localResourceManager stream do
-    observe observable \msg -> do
+  runQuasarIO (streamQuasar stream) do
+    -- Initial state is defined as loading, no extra message has to be sent
+    isLoading <- newTVarIO True
+    outbox <- newTVarIO Nothing
+    async_ $ liftIO $ sendThread isLoading outbox
+    quasarAtomically $ observe_ observable (callback isLoading outbox)
+
+  where
+    callback :: TVar Bool -> TVar (Maybe (ObservableState a)) -> ObservableCallback a
+    callback isLoading outbox state = liftSTM do
+      unlessM (readTVar isLoading) do
+        unsafeQueueStreamMessage stream PackedObservableLoading
+        writeTVar isLoading True
+      writeTVar outbox (Just state)
+
+    sendThread :: TVar Bool -> TVar (Maybe (ObservableState a)) -> IO ()
+    sendThread isLoading outbox = forever do
+      atomically $ check . isJust =<< readTVar outbox
       catch
-        -- TODO streamSend is blocking, but the callback should return immediately
-        do streamSend stream $ packObservableMessage msg
+        (streamSendDeferred stream payloadHook)
         \ChannelNotConnected -> pure ()
+      where
+        payloadHook :: STM (PackedObservableState a)
+        payloadHook = do
+          writeTVar isLoading False
+          swapTVar outbox Nothing >>= \case
+            Just state -> pure $ packObservableState state
+            Nothing -> unreachableCodePathM
diff --git a/src/Quasar/Network/TH.hs b/src/Quasar/Network/TH.hs
index 12b71cb..692638e 100644
--- a/src/Quasar/Network/TH.hs
+++ b/src/Quasar/Network/TH.hs
@@ -394,12 +394,12 @@ generateObservable api observable = pure Code {
     observableE ctx = [|$(varE serverImplFieldName) $(ctx.implRecordE)|]
     observableStubDec :: [Q Dec]
     observableStubDec = [
-      sigD (mkName observable.name) [t|$(clientType api) -> IO (Observable $(observable.ty))|],
+      sigD (mkName observable.name) [t|$(clientType api) -> QuasarIO (Observable $(observable.ty))|],
       do
         clientName <- newName "client"
         let clientE = varE clientName
         funD (mkName observable.name) [
-          clause [varP clientName] (normalB [|newObservableStub ($retrieveE $clientE) ($observeE $clientE)|]) []
+          clause [varP clientName] (normalB [|newObservableClient ($retrieveE $clientE) ($observeE $clientE)|]) []
           ]
       ]
     observeE :: Q Exp
diff --git a/test/Quasar/Network/MultiplexerSpec.hs b/test/Quasar/Network/MultiplexerSpec.hs
index db80365..5f4ceaa 100644
--- a/test/Quasar/Network/MultiplexerSpec.hs
+++ b/test/Quasar/Network/MultiplexerSpec.hs
@@ -1,12 +1,11 @@
 module Quasar.Network.MultiplexerSpec (spec) where
 
+import Control.Concurrent (threadDelay)
 import Control.Concurrent.Async (concurrently_)
 import Control.Concurrent.MVar
 import Control.Monad.Catch
-import Control.Monad.Reader (ReaderT)
 import Data.ByteString.Lazy qualified as BSL
 import Quasar
-import Quasar.MonadQuasar.Misc
 import Quasar.Network.Multiplexer
 import Quasar.Network.Connection
 import Quasar.Prelude
diff --git a/test/Quasar/NetworkSpec.hs b/test/Quasar/NetworkSpec.hs
index 6a729cb..b448a79 100644
--- a/test/Quasar/NetworkSpec.hs
+++ b/test/Quasar/NetworkSpec.hs
@@ -132,7 +132,7 @@ spec = parallel $ do
     it "can retrieve values" $ rm do
       var <- newObservableVarIO 42
       withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
-        observable <- liftIO $ intObservable client
+        observable <- intObservable client
         retrieve observable `shouldReturn` 42
         atomically $ setObservableVar var 13
         retrieve observable `shouldReturn` 13
@@ -142,8 +142,8 @@ spec = parallel $ do
 
       withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
 
-        resultVar <- liftIO $ newTVarIO ObservableLoading
-        observable <- liftIO $ intObservable client
+        resultVar <- newTVarIO ObservableLoading
+        observable <- intObservable client
 
         -- Change the value before calling `observe`
         atomically $ setObservableVar var 42
@@ -159,8 +159,8 @@ spec = parallel $ do
     it "receives continuous updates when observing" $ rm do
       var <- newObservableVarIO 42
       withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
-        resultVar <- liftIO $ newTVarIO ObservableLoading
-        observable <- liftIO $ intObservable client
+        resultVar <- newTVarIO ObservableLoading
+        observable <- intObservable client
 
         observeIO_ observable $ \msg -> writeTVar resultVar msg
 
@@ -168,7 +168,7 @@ spec = parallel $ do
               \case
                 -- Send and receive are running asynchronously, so this retries until the expected value is received.
                 -- Blocks forever if the wrong or no value is received.
-                ObservableValue x -> if (x == expected) then pure (pure ()) else retry
+                ObservableValue x -> if x == expected then pure (pure ()) else retry
                 ObservableLoading -> retry
                 ObservableNotAvailable ex -> pure $ throwIO ex
 
@@ -183,8 +183,8 @@ spec = parallel $ do
     it "receives no further updates after unsubscribing" $ rm do
       var <- newObservableVarIO 42
       withStandaloneClient @ObservableExampleProtocol (ObservableExampleProtocolImpl (toObservable var)) $ \client -> do
-        resultVar <- liftIO $ newTVarIO ObservableLoading
-        observable <- liftIO $ intObservable client
+        resultVar <- newTVarIO ObservableLoading
+        observable <- intObservable client
 
         disposer <- observeIO observable $ \msg -> writeTVar resultVar msg
 
-- 
GitLab