82 lines
2.6 KiB
Python
82 lines
2.6 KiB
Python
from paho.mqtt import client as mqtt_client
|
|
import logging
|
|
from sys import exit
|
|
from time import sleep
|
|
from collections.abc import Callable
|
|
from utils import Utils
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MQTTConnection:
|
|
def __init__(self,
|
|
broker: str,
|
|
port: int,
|
|
baseTopic: str,
|
|
clientId: str,
|
|
username: str | None,
|
|
password: str | None,
|
|
caCertPath: str | None,
|
|
certPath: str | None,
|
|
keyPath: str | None,
|
|
minReconnectDelay: int,
|
|
maxReconnectDelay: int,
|
|
reconnectAttempts: int,
|
|
reconnectDelayMultiplier: int,
|
|
disconnectCallback: Callable[[], None]
|
|
):
|
|
self.__broker = broker
|
|
self.__port = port
|
|
self.__baseTopic = baseTopic
|
|
self.__clientId = clientId
|
|
self.__client = mqtt_client.Client(self.__clientId)
|
|
|
|
if (username and password):
|
|
self.__client.username_pw_set(username, password)
|
|
self.__client.tls_set(ca_certs=caCertPath, certfile=certPath, keyfile=keyPath)
|
|
self.__client.on_connect = self.__onConnect
|
|
|
|
self.__disconnectCallback = disconnectCallback
|
|
|
|
self.__client.on_disconnect = Utils.getReconnector(
|
|
self.__client.reconnect,
|
|
"MQTT broker",
|
|
minReconnectDelay,
|
|
maxReconnectDelay,
|
|
reconnectAttempts,
|
|
reconnectDelayMultiplier,
|
|
self.__disconnectCallback
|
|
)
|
|
|
|
self.__connect()
|
|
|
|
self.__client.loop_start()
|
|
|
|
def __onConnect(self, client, userdata, flags, rc):
|
|
if (rc == 0):
|
|
logger.info("Connected to MQTT broker")
|
|
else:
|
|
logger.error("Unable to connect to MQTT broker, exiting")
|
|
self.__disconnectCallback()
|
|
exit()
|
|
|
|
def publish(self, subTopic: str, message: str) -> int:
|
|
topic = self.__baseTopic + "/" + subTopic
|
|
logging.debug(f"Sending MQTT payload to topic {topic}: {message}")
|
|
return self.__client.publish(topic, message)[0]
|
|
|
|
def subscribe(self, subTopic: str, callback: Callable[[str], None], prefixBaseTopic: bool = True) -> None:
|
|
def onMessage(client, userdata, msg):
|
|
callback(msg.payload.decode())
|
|
|
|
topic = self.__baseTopic + "/" + subTopic if prefixBaseTopic else subTopic
|
|
self.__client.subscribe(topic)
|
|
self.__client.on_message = onMessage
|
|
|
|
def disconnect(self):
|
|
self.__client.loop_stop()
|
|
self.publish("available", "offline")
|
|
|
|
def __connect(self):
|
|
self.__client.connect(self.__broker, self.__port)
|
|
|