Skip to content
Snippets Groups Projects
Commit 5bf80130 authored by fxk8y's avatar fxk8y :spider:
Browse files

Removing requests; taking Subscriptions from SpiderMQTT

parent 409dcde0
No related branches found
No related tags found
No related merge requests found
class MyDec:
def __init__(*args, **kwargs):
......
......@@ -249,36 +249,73 @@ class Executor:
pass # TODO: logging!!!
class SpiderMQTT:
# TODO: Move into SpiderMQTT class!
class Subscription:
class Request:
class ACTIVE: pass
class PENDING: pass
class CLEANUP: pass
def __init__(self, spider, requestTopic, responseTopic, payload, callback, pub_qos):
self.spider = spider
self.pub_qos = pub_qos
self.payload = payload
self.callback = callback
self.requestTopic = Topic(requestTopic)
self.responseTopic = Topic(responseTopic)
def __init__(self, spider, topic, callback=None, subscribeCallback=None):
self.spider = spider
self.topic = Topic(topic)
self.state = self.PENDING
self.callbacks = set()
self.subscribeCallbacks = set()
self.__msgCallback = self.onMessage
self.__subCallback = self.onSubscribe
if callback is not None:
self.addCallback(callback)
if self.requestTopic.containsWildcard():
raise ValueError('requestTopic mustn\'t contain any wildcards')
if subscribeCallback is not None:
self.addSubscribeCallback(subscribeCallback)
if self.spider.isConnected():
self.spider.addCallback(self.__msgCallback, self.__subCallback)
else:
pass # TODO: figure out what to do...
self.__sub()
def __sub(self):
if self.state is self.PENDING and self.spider.isConnected():
self.sub_mid = self.spider.mqtt.subscribe(self.topic, self.spider.sub_qos)[1]
def __unsub(self):
pass # TODO: implement!
def onSubscribe(self, mid):
print('onSubscribe[mid={}]: self.mid={} matching={}'.format(mid, self.sub_mid, self.sub_mid == mid))
if hasattr(self, 'sub_mid') and self.sub_mid == mid:
self.state = self.ACTIVE
del self.sub_mid
Executor(self.subscribeCallbacks)
def onMessage(self, msg):
if self.topic.compare(msg.topic):
Executor(self.callbacks, msg)
def onConnect(self):
self.__sub()
def onSubscribe(self):
self.spider.publish(self.requestTopic, self.payload, qos=pub_qos, retain=False)
def onDisconnect(self):
self.state = self.PENDING
def onMessage(self, msg):
self.spider.removeCallback(self.__msgCallback, self.__subCallback)
self.spider.requests.remove(self)
Executor(self.callback, msg)
def addCallback(self, callback):
self.callbacks.add(callback)
def removeCallback(self, callback):
if callback in self.callbacks:
self.callbacks.remove(callback)
if self.callbacks == set():
self.state = self.CLEANUP
self.__unsub()
def addSubscribeCallbacks(self, callback):
self.subscribeCallbacks.add(callback)
if self.state is self.ACTIVE:
Executor(callback)
def removeSubscribeCallbacks(self, callback):
if callback in self.subscribeCallbacks:
self.subscribeCallbacks.remove(callback)
class SpiderMQTT:
def __init__(self, broker: str, port: int = 1883, user: str = None, password: str = None, sub_qos: int = 0,
will_topic: str = None, will_payload = None, will_qos: int = 0, will_retain: bool = False, backgroundTask: bool = True):
......@@ -286,7 +323,7 @@ class SpiderMQTT:
self.sub_qos = sub_qos
self.connected = False
self.requests = set()
# self.requests = set()
self.subscriptions = {}
self.pendingMessages = []
......@@ -306,11 +343,11 @@ class SpiderMQTT:
self.mqtt.will_set(will_topic, will_payload, will_qos, will_retain)
# self.mqtt.on_log = self.__onLog
self.mqtt.on_message = self.__onMessage
self.mqtt.on_publish = self.__onPublish
self.mqtt.on_connect = self.__onConnect
self.mqtt.on_subscribe = self.__onSubscribe
self.mqtt.on_disconnect = self.__onDisconnect
self.mqtt.on_message = self.__onMessage
self.mqtt.on_publish = self.__onPublish
self.mqtt.on_connect = self.__onConnect
self.mqtt.on_subscribe = self.__onSubscribe
self.mqtt.on_disconnect = self.__onDisconnect
self.mqtt.on_unsubscribe = self.__onUnSubscribe
self.mqtt.connect(broker, port)
......@@ -352,10 +389,6 @@ class SpiderMQTT:
self.pendingMessages += [msg]
def request(self, requestTopic, responseTopic, payload, callback, pub_qos=0):
self.requests.add(self.Request(self, requestTopic, responseTopic, payload, callback, pub_qos))
# TODO: further actions needed????
def __onMessage(self, _cl, _ud, msg):
message = Message(msg.topic, msg.payload)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment