From 5bf801305f71ae7506805b01bf452ad8cec30fe1 Mon Sep 17 00:00:00 2001 From: Jochen Vothknecht <jochen3120@gmail.com> Date: Fri, 17 Dec 2021 06:52:48 +0100 Subject: [PATCH] Removing requests; taking Subscriptions from SpiderMQTT --- SiliconTorch/DecoratorTest.py | 1 - SiliconTorch/MQTT.py | 99 +++++++++++++++++++++++------------ 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/SiliconTorch/DecoratorTest.py b/SiliconTorch/DecoratorTest.py index f786f0b..e646809 100644 --- a/SiliconTorch/DecoratorTest.py +++ b/SiliconTorch/DecoratorTest.py @@ -1,6 +1,5 @@ - class MyDec: def __init__(*args, **kwargs): diff --git a/SiliconTorch/MQTT.py b/SiliconTorch/MQTT.py index 32f5cce..d746778 100644 --- a/SiliconTorch/MQTT.py +++ b/SiliconTorch/MQTT.py @@ -249,36 +249,73 @@ class Executor: pass # TODO: logging!!! -class SpiderMQTT: +# TODO: Move into SpiderMQTT class! +class Subscription: - class Request: + class ACTIVE: pass + class PENDING: pass + class CLEANUP: pass - def __init__(self, spider, requestTopic, responseTopic, payload, callback, pub_qos): - self.spider = spider - self.pub_qos = pub_qos - self.payload = payload - self.callback = callback - self.requestTopic = Topic(requestTopic) - self.responseTopic = Topic(responseTopic) + def __init__(self, spider, topic, callback=None, subscribeCallback=None): + self.spider = spider + self.topic = Topic(topic) + self.state = self.PENDING + self.callbacks = set() + self.subscribeCallbacks = set() - self.__msgCallback = self.onMessage - self.__subCallback = self.onSubscribe + if callback is not None: + self.addCallback(callback) - if self.requestTopic.containsWildcard(): - raise ValueError('requestTopic mustn\'t contain any wildcards') + if subscribeCallback is not None: + self.addSubscribeCallback(subscribeCallback) - if self.spider.isConnected(): - self.spider.addCallback(self.__msgCallback, self.__subCallback) - else: - pass # TODO: figure out what to do... + self.__sub() + + def __sub(self): + if self.state is self.PENDING and self.spider.isConnected(): + self.sub_mid = self.spider.mqtt.subscribe(self.topic, self.spider.sub_qos)[1] + + def __unsub(self): + pass # TODO: implement! + + def onSubscribe(self, mid): + print('onSubscribe[mid={}]: self.mid={} matching={}'.format(mid, self.sub_mid, self.sub_mid == mid)) + if hasattr(self, 'sub_mid') and self.sub_mid == mid: + self.state = self.ACTIVE + del self.sub_mid + Executor(self.subscribeCallbacks) + + def onMessage(self, msg): + if self.topic.compare(msg.topic): + Executor(self.callbacks, msg) + + def onConnect(self): + self.__sub() - def onSubscribe(self): - self.spider.publish(self.requestTopic, self.payload, qos=pub_qos, retain=False) + def onDisconnect(self): + self.state = self.PENDING - def onMessage(self, msg): - self.spider.removeCallback(self.__msgCallback, self.__subCallback) - self.spider.requests.remove(self) - Executor(self.callback, msg) + def addCallback(self, callback): + self.callbacks.add(callback) + + def removeCallback(self, callback): + if callback in self.callbacks: + self.callbacks.remove(callback) + if self.callbacks == set(): + self.state = self.CLEANUP + self.__unsub() + + def addSubscribeCallbacks(self, callback): + self.subscribeCallbacks.add(callback) + if self.state is self.ACTIVE: + Executor(callback) + + def removeSubscribeCallbacks(self, callback): + if callback in self.subscribeCallbacks: + self.subscribeCallbacks.remove(callback) + + +class SpiderMQTT: def __init__(self, broker: str, port: int = 1883, user: str = None, password: str = None, sub_qos: int = 0, will_topic: str = None, will_payload = None, will_qos: int = 0, will_retain: bool = False, backgroundTask: bool = True): @@ -286,7 +323,7 @@ class SpiderMQTT: self.sub_qos = sub_qos self.connected = False - self.requests = set() + # self.requests = set() self.subscriptions = {} self.pendingMessages = [] @@ -306,11 +343,11 @@ class SpiderMQTT: self.mqtt.will_set(will_topic, will_payload, will_qos, will_retain) # self.mqtt.on_log = self.__onLog - self.mqtt.on_message = self.__onMessage - self.mqtt.on_publish = self.__onPublish - self.mqtt.on_connect = self.__onConnect - self.mqtt.on_subscribe = self.__onSubscribe - self.mqtt.on_disconnect = self.__onDisconnect + self.mqtt.on_message = self.__onMessage + self.mqtt.on_publish = self.__onPublish + self.mqtt.on_connect = self.__onConnect + self.mqtt.on_subscribe = self.__onSubscribe + self.mqtt.on_disconnect = self.__onDisconnect self.mqtt.on_unsubscribe = self.__onUnSubscribe self.mqtt.connect(broker, port) @@ -352,10 +389,6 @@ class SpiderMQTT: self.pendingMessages += [msg] - def request(self, requestTopic, responseTopic, payload, callback, pub_qos=0): - self.requests.add(self.Request(self, requestTopic, responseTopic, payload, callback, pub_qos)) - # TODO: further actions needed???? - def __onMessage(self, _cl, _ud, msg): message = Message(msg.topic, msg.payload) -- GitLab