Files
thingy52-mqtt-bridge/mqtt.py
2025-05-03 23:59:00 +01:00

82 lines
2.6 KiB
Python

from paho.mqtt import client as mqtt_client
import logging
from sys import exit
from time import sleep
from collections.abc import Callable
from utils import Utils
logger = logging.getLogger(__name__)
class MQTTConnection:
def __init__(self,
broker: str,
port: int,
baseTopic: str,
clientId: str,
username: str | None,
password: str | None,
caCertPath: str | None,
certPath: str | None,
keyPath: str | None,
minReconnectDelay: int,
maxReconnectDelay: int,
reconnectAttempts: int,
reconnectDelayMultiplier: int,
disconnectCallback: Callable[[], None]
):
self.__broker = broker
self.__port = port
self.__baseTopic = baseTopic
self.__clientId = clientId
self.__client = mqtt_client.Client(self.__clientId)
if (username and password):
self.__client.username_pw_set(username, password)
self.__client.tls_set(ca_certs=caCertPath, certfile=certPath, keyfile=keyPath)
self.__client.on_connect = self.__onConnect
self.__disconnectCallback = disconnectCallback
self.__client.on_disconnect = Utils.getReconnector(
self.__client.reconnect,
"MQTT broker",
minReconnectDelay,
maxReconnectDelay,
reconnectAttempts,
reconnectDelayMultiplier,
self.__disconnectCallback
)
self.__connect()
self.__client.loop_start()
def __onConnect(self, client, userdata, flags, rc):
if (rc == 0):
logger.info("Connected to MQTT broker")
else:
logger.error("Unable to connect to MQTT broker, exiting")
self.__disconnectCallback()
exit()
def publish(self, subTopic: str, message: str) -> int:
topic = self.__baseTopic + "/" + subTopic
logging.debug(f"Sending MQTT payload to topic {topic}: {message}")
return self.__client.publish(topic, message)[0]
def subscribe(self, subTopic: str, callback: Callable[[str], None], prefixBaseTopic: bool = True) -> None:
def onMessage(client, userdata, msg):
callback(msg.payload.decode())
topic = self.__baseTopic + "/" + subTopic if prefixBaseTopic else subTopic
self.__client.subscribe(topic)
self.__client.on_message = onMessage
def disconnect(self):
self.__client.loop_stop()
self.publish("available", "offline")
def __connect(self):
self.__client.connect(self.__broker, self.__port)