Files
thingy52-mqtt-bridge/device.py
2025-05-03 23:59:00 +01:00

362 lines
14 KiB
Python

from bluepy import btle, thingy52
import logging
from collections.abc import Callable
from time import sleep, time
from utils import Utils
import binascii
logger = logging.getLogger(__name__)
class Device:
def __init__(self,
mac: str | None,
updateFrequency: int,
minReconnectDelay: int,
maxReconnectDelay: int,
reconnectAttempts: int,
reconnectDelayMultiplier: int,
disconnectCallback: Callable[[], None],
sensors: list[str],
temperatureOffset: int
):
self.__enabledSensors = sensors
self.__data = {
"battery": [None, []],
"temperature": [None, []],
"pressure": [None, []],
"humidity": [None, []],
"eco2": [None, []],
"tvoc": [None, []],
"button": [None, []],
"taps": [None, []],
"tapDirection": [None, []],
"orient": [None, []],
"stepCount": [None, []]
}
self.__reconnect = Utils.getReconnector(
self.connect,
"Thingy",
minReconnectDelay,
maxReconnectDelay,
reconnectAttempts,
reconnectDelayMultiplier,
disconnectCallback
)
self.__run = True
self.__temperatureOffset = temperatureOffset
logger.info("Initialising device")
self.__updateFrequency = updateFrequency
self.__disconnectCallback = disconnectCallback
self.__delegate = NotificationDelegate(self.__setData, self.__notifySubscribers)
while True:
try:
self.__mac = mac or Device.findMac()
self.connect()
break
except btle.BTLEDisconnectError or BTLEInternalError:
logging.warn("Unable to connect to Thingy, trying again")
logger.info("Finished device setup")
def start(self):
logging.debug("Starting reporting loop")
nextReport = time() + 2
batteryReportInterval = max(60, self.__updateFrequency)
nextBatteryReport = nextReport
connected = True
while self.__run:
try:
if (not connected):
self.__reconnect()
nextReport = time() + 2
connected = True
logging.debug("New reporting loop iteration")
if ("battery" in self.__enabledSensors and time() >= nextBatteryReport):
self.__setData("battery", self.__dev.battery.read())
nextBatteryReport = time() + max(60, self.__updateFrequency)
self.__dev.waitForNotifications(timeout=self.__updateFrequency)
if (time() >= nextReport):
logging.debug("Reporting sensor data")
for sensor in self.__data.keys():
if (sensor in self.__enabledSensors): self.__notifySubscribers(sensor)
nextReport = time() + self.__updateFrequency
except btle.BTLEDisconnectError or BTLEInternalError:
logging.warning("Thingy disconnected, reconnecting")
connected = False
def connect(self):
self.__dev = thingy52.Thingy52(self.__mac)
logging.info("Connected to device")
self.__dev.setDelegate(self.__delegate)
logging.debug("Configured notification handler")
self.__dev.ui.enable()
self.__dev.ui.set_led_mode_constant(1,0,1)
self.__enableSensors()
self.__enableNotifications()
def disconnect(self):
self.__run = False
self.__dev.disconnect()
def subscribe(self, sensor: str, callback: Callable[[int], None]):
if (sensor in self.__data.keys() and sensor in self.__enabledSensors):
self.__data[sensor][1].append(callback)
logging.debug(f"Current subscriptions for {sensor}: {repr(self.__data[sensor][1])}")
def unsubscribe(self, sensor: str, callback: Callable[[int], None]):
if (sensor in self.__data.keys() and sensor in self.__enabledSensors):
self.__data[sensor][1].remove(callback)
def __notifySubscribers(self, sensor: str):
if (sensor not in self.__data.keys()): return
data = self.__data[sensor][0]
logging.debug(f"Current {sensor}: {data}")
if (data != None):
for callback in self.__data[sensor][1]:
callback(data)
self.__data[sensor][0] = None
def __setData(self, sensor: str, val: str):
if (sensor in self.__data.keys()):
if (sensor == "temperature"):
self.__data[sensor][0] = float(val) + self.__temperatureOffset
else:
self.__data[sensor][0] = val
def __enableSensors(self):
logging.info("Enabling sensors")
# environment
if ("temperature" in self.__enabledSensors):
self.__dev.environment.enable()
self.__dev.environment.configure(temp_int=self.__updateFrequency * 1000)
logging.debug("Enabled temperature sensor")
if ("pressure" in self.__enabledSensors):
self.__dev.environment.enable()
self.__dev.environment.configure(press_int=self.__updateFrequency * 1000)
logging.debug("Enabled pressure sensor")
if ("humidity" in self.__enabledSensors):
self.__dev.environment.enable()
self.__dev.environment.configure(humid_int=self.__updateFrequency * 1000)
logging.debug("Enabled humidity sensor")
if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors):
self.__dev.environment.enable()
self.__dev.environment.configure(gas_mode_int=1)
logging.debug("Enabled gas sensors")
if ("colour" in self.__enabledSensors):
self.__dev.environment.enable()
self.__dev.environment.configure(color_int=self.__updateFrequency * 1000)
self.__dev.environment.configure(color_sens_calib=[0,0,0])
logging.debug("Enabled colour sensor")
# UI
if ("button" in self.__enabledSensors):
self.__dev.ui.enable()
logging.debug("Enabled button sensor")
if ("battery" in self.__enabledSensors):
self.__dev.battery.enable()
logging.debug("Enabled battery sensor")
# motion
if ("taps" in self.__enabledSensors):
self.__dev.motion.enable()
self.__dev.motion.configure(motion_freq=200)
logging.debug("Enabled taps sensor")
if ("orient" in self.__enabledSensors):
self.__dev.motion.enable()
logging.debug("Enabled orient sensor")
if ("stepCount" in self.__enabledSensors):
self.__dev.motion.enable()
self.__dev.motion.configure(step_int=self.__updateFrequency * 1000)
logging.debug("Enabled stepCount sensor")
logging.info("Enabled sensors")
def __enableNotifications(self):
logging.info("Enabling notifications")
# environment
if ("temperature" in self.__enabledSensors):
self.__dev.environment.set_temperature_notification(True)
logging.debug("Enabled temperature sensor")
if ("pressure" in self.__enabledSensors):
self.__dev.environment.set_pressure_notification(True)
logging.debug("Enabled pressure sensor")
if ("humidity" in self.__enabledSensors):
self.__dev.environment.set_humidity_notification(True)
logging.debug("Enabled humidity sensor")
if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors):
self.__dev.environment.set_gas_notification(True)
logging.debug("Enabled gas sensors")
if ("colour" in self.__enabledSensors):
self.__dev.environment.set_color_notification(True)
logging.debug("Enabled colour sensor")
# UI
if ("button" in self.__enabledSensors):
self.__dev.ui.set_btn_notification(True)
logging.debug("Enabled button sensor")
# motion
if ("taps" in self.__enabledSensors):
self.__dev.motion.set_tap_notification(True)
logging.debug("Enabled taps sensor")
if ("orient" in self.__enabledSensors):
self.__dev.motion.set_orient_notification(True)
logging.debug("Enabled orient sensor")
if ("stepCount" in self.__enabledSensors):
self.__dev.motion.set_stepcnt_notification(True)
logging.debug("Enabled stepCount sensor")
logging.info("Enabled notifications")
@staticmethod
def findMac():
while True:
logger.info("Scanning for BLE devices...")
for device in btle.Scanner().scan(timeout=5):
logger.info(f"Found device with MAC {device.addr} with RSSI ${device.rssi}. Checking if it's a Thingy...")
for (_, _, value) in device.getScanData():
if (value == "Thingy"):
logger.info("... this device is a Thingy")
return device.addr
class NotificationDelegate(btle.DefaultDelegate):
def __init__(self, setData: Callable[[str, str], None], notifySubscribers: Callable[[str], None]):
self.__setData = setData
self.__notifySubscribers = notifySubscribers
def handleNotification(self, hnd, data):
logging.debug(f"Handling notification: {repr(hnd)}")
if (hnd == thingy52.e_temperature_handle):
teptep = binascii.b2a_hex(data)
self.__setData("temperature", Utils.hexStrToInt(teptep[:-2]) + int(teptep[-2:], 16) / 100.0)
elif (hnd == thingy52.e_pressure_handle):
pressure_int, pressure_dec = self.__extractPressureData(data)
self.__setData("pressure", pressure_int + pressure_dec / 100.0)
elif (hnd == thingy52.e_humidity_handle):
teptep = binascii.b2a_hex(data)
self.__setData("humidity", Utils.hexStrToInt(teptep))
elif (hnd == thingy52.e_gas_handle):
eco2, tvoc = self.__extractGasData(data)
logging.debug(f"Handling gas sensor update: {eco2}, {tvoc}")
self.__setData("eco2", eco2)
self.__setData("tvoc", tvoc)
elif (hnd == thingy52.e_color_handle):
teptep = binascii.b2a_hex(data)
red, green, blue, clear = self.__extractColourData(data)
self.__setData("colour", "0x%0.2X%0.2X%0.2X" %(red, green, blue))
elif (hnd == thingy52.ui_button_handle):
teptep = binascii.b2a_hex(data)
self.__setData("button", "on" if Utils.hexStrToInt(teptep) == 1 else "off") # 1 = pressed, 0 = released
self.__notifySubscribers("button")
elif (hnd == thingy52.m_tap_handle):
direction, count = self.__extractTapData(data)
match direction:
case 1:
tapDirection = "Bottom"
case 2:
tapDirection = "Left"
case 3:
tapDirection = "Top"
case 4:
tapDirection = "Right"
case 5:
tapDirection = "Front"
case 6:
tapDirection = "Back"
case _:
tapDirection = f"Unknown, {value}"
self.__setData("tapDirection", tapDirection)
self.__setData("taps", count)
elif (hnd == thingy52.m_orient_handle):
teptep = binascii.b2a_hex(data)
value = Utils.hexStrToInt(teptep)
# 1 = led top left
# 2 = led top right / left side up
# 3 = led bottom right/bottom up
# 0 = led bottom left/ right side up
match value:
case 0:
orientation = "LED Bottom Left"
case 1:
orientation = "LED Top Left"
case 2:
orientation = "LED Top Right"
case 3:
orientation = "LED Bottom Right"
case _:
orientation = f"Unknown, {value}"
self.__setData("orient", orientation)
elif (hnd == thingy52.m_stepcnt_handle):
logging.info("Handling step count update")
teptep = binascii.b2a_hex(data)
self.__setData("stepCount", Utils.hexStrToInt(teptep))
elif (hnd == thingy52.m_rotation_handle):
teptep = binascii.b2a_hex(data)
self.__setData("rotation", Utils.hexStrToInt(teptep))
elif (hnd == thingy52.m_heading_handle):
teptep = binascii.b2a_hex(data)
self.__setData("heading", Utils.hexStrToInt(teptep))
elif (hnd == thingy52.m_gravity_handle):
teptep = binascii.b2a_hex(data)
self.__setData("gravity", Utils.hexStrToInt(teptep))
elif (hnd == thingy52.s_speaker_status_handle):
teptep = binascii.b2a_hex(data)
self.__setData("speaker", Utils.hexStrToInt(teptep))
elif (hnd == thingy52.s_microphone_handle):
teptep = binascii.b2a_hex(data)
else:
logging.warn(f"Unknown notification: {hnd}, {data}")
def __extractPressureData(self, data):
""" Extract pressure data from data string. """
teptep = binascii.b2a_hex(data)
pressure_int = 0
for i in range(0, 4):
pressure_int += (int(teptep[i*2:(i*2)+2], 16) << 8*i)
pressure_dec = int(teptep[-2:], 16)
return (pressure_int, pressure_dec)
def __extractGasData(self, data):
""" Extract gas data from data string. """
teptep = binascii.b2a_hex(data)
eco2 = int(teptep[:2], 16) + (int(teptep[2:4], 16) << 8)
tvoc = int(teptep[4:6], 16) + (int(teptep[6:8], 16) << 8)
return eco2, tvoc
def __extractColourData(self, data):
""" Extract color data from data string. """
teptep = binascii.b2a_hex(data)
red = int(teptep[:2], 16)
green = int(teptep[2:4], 16)
blue = int(teptep[4:6], 16)
clear = int(teptep[6:8], 16)
return red, green, blue, clear
def __extractTapData(self, data):
""" Extract tap data from data string. """
teptep = binascii.b2a_hex(data)
direction = int(teptep[0:2])
count = int(teptep[2:4])
return (direction, count)