from bluepy import btle, thingy52 import logging from collections.abc import Callable from time import sleep, time from utils import Utils import binascii logger = logging.getLogger(__name__) class Device: def __init__(self, mac: str | None, updateFrequency: int, minReconnectDelay: int, maxReconnectDelay: int, reconnectAttempts: int, reconnectDelayMultiplier: int, disconnectCallback: Callable[[], None], sensors: list[str], temperatureOffset: int ): self.__enabledSensors = sensors self.__data = { "battery": [None, []], "temperature": [None, []], "pressure": [None, []], "humidity": [None, []], "eco2": [None, []], "tvoc": [None, []], "button": [None, []], "taps": [None, []], "tapDirection": [None, []], "orient": [None, []], "stepCount": [None, []] } self.__reconnect = Utils.getReconnector( self.connect, "Thingy", minReconnectDelay, maxReconnectDelay, reconnectAttempts, reconnectDelayMultiplier, disconnectCallback ) self.__run = True self.__temperatureOffset = temperatureOffset logger.info("Initialising device") self.__updateFrequency = updateFrequency self.__disconnectCallback = disconnectCallback self.__delegate = NotificationDelegate(self.__setData, self.__notifySubscribers) while True: try: self.__mac = mac or Device.findMac() self.connect() break except btle.BTLEDisconnectError or BTLEInternalError: logging.warn("Unable to connect to Thingy, trying again") logger.info("Finished device setup") def start(self): logging.debug("Starting reporting loop") nextReport = time() + 2 batteryReportInterval = max(60, self.__updateFrequency) nextBatteryReport = nextReport connected = True while self.__run: try: if (not connected): self.__reconnect() nextReport = time() + 2 connected = True logging.debug("New reporting loop iteration") if ("battery" in self.__enabledSensors and time() >= nextBatteryReport): self.__setData("battery", self.__dev.battery.read()) nextBatteryReport = time() + max(60, self.__updateFrequency) self.__dev.waitForNotifications(timeout=self.__updateFrequency) if (time() >= nextReport): logging.debug("Reporting sensor data") for sensor in self.__data.keys(): if (sensor in self.__enabledSensors): self.__notifySubscribers(sensor) nextReport = time() + self.__updateFrequency except btle.BTLEDisconnectError or BTLEInternalError: logging.warning("Thingy disconnected, reconnecting") connected = False def connect(self): self.__dev = thingy52.Thingy52(self.__mac) logging.info("Connected to device") self.__dev.setDelegate(self.__delegate) logging.debug("Configured notification handler") self.__dev.ui.enable() self.__dev.ui.set_led_mode_constant(1,0,1) self.__enableSensors() self.__enableNotifications() def disconnect(self): self.__run = False self.__dev.disconnect() def subscribe(self, sensor: str, callback: Callable[[int], None]): if (sensor in self.__data.keys() and sensor in self.__enabledSensors): self.__data[sensor][1].append(callback) logging.debug(f"Current subscriptions for {sensor}: {repr(self.__data[sensor][1])}") def unsubscribe(self, sensor: str, callback: Callable[[int], None]): if (sensor in self.__data.keys() and sensor in self.__enabledSensors): self.__data[sensor][1].remove(callback) def __notifySubscribers(self, sensor: str): if (sensor not in self.__data.keys()): return data = self.__data[sensor][0] logging.debug(f"Current {sensor}: {data}") if (data != None): for callback in self.__data[sensor][1]: callback(data) self.__data[sensor][0] = None def __setData(self, sensor: str, val: str): if (sensor in self.__data.keys()): if (sensor == "temperature"): self.__data[sensor][0] = float(val) + self.__temperatureOffset else: self.__data[sensor][0] = val def __enableSensors(self): logging.info("Enabling sensors") # environment if ("temperature" in self.__enabledSensors): self.__dev.environment.enable() self.__dev.environment.configure(temp_int=self.__updateFrequency * 1000) logging.debug("Enabled temperature sensor") if ("pressure" in self.__enabledSensors): self.__dev.environment.enable() self.__dev.environment.configure(press_int=self.__updateFrequency * 1000) logging.debug("Enabled pressure sensor") if ("humidity" in self.__enabledSensors): self.__dev.environment.enable() self.__dev.environment.configure(humid_int=self.__updateFrequency * 1000) logging.debug("Enabled humidity sensor") if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors): self.__dev.environment.enable() self.__dev.environment.configure(gas_mode_int=1) logging.debug("Enabled gas sensors") if ("colour" in self.__enabledSensors): self.__dev.environment.enable() self.__dev.environment.configure(color_int=self.__updateFrequency * 1000) self.__dev.environment.configure(color_sens_calib=[0,0,0]) logging.debug("Enabled colour sensor") # UI if ("button" in self.__enabledSensors): self.__dev.ui.enable() logging.debug("Enabled button sensor") if ("battery" in self.__enabledSensors): self.__dev.battery.enable() logging.debug("Enabled battery sensor") # motion if ("taps" in self.__enabledSensors): self.__dev.motion.enable() self.__dev.motion.configure(motion_freq=200) logging.debug("Enabled taps sensor") if ("orient" in self.__enabledSensors): self.__dev.motion.enable() logging.debug("Enabled orient sensor") if ("stepCount" in self.__enabledSensors): self.__dev.motion.enable() self.__dev.motion.configure(step_int=self.__updateFrequency * 1000) logging.debug("Enabled stepCount sensor") logging.info("Enabled sensors") def __enableNotifications(self): logging.info("Enabling notifications") # environment if ("temperature" in self.__enabledSensors): self.__dev.environment.set_temperature_notification(True) logging.debug("Enabled temperature sensor") if ("pressure" in self.__enabledSensors): self.__dev.environment.set_pressure_notification(True) logging.debug("Enabled pressure sensor") if ("humidity" in self.__enabledSensors): self.__dev.environment.set_humidity_notification(True) logging.debug("Enabled humidity sensor") if ("eco2" in self.__enabledSensors or "tvoc" in self.__enabledSensors): self.__dev.environment.set_gas_notification(True) logging.debug("Enabled gas sensors") if ("colour" in self.__enabledSensors): self.__dev.environment.set_color_notification(True) logging.debug("Enabled colour sensor") # UI if ("button" in self.__enabledSensors): self.__dev.ui.set_btn_notification(True) logging.debug("Enabled button sensor") # motion if ("taps" in self.__enabledSensors): self.__dev.motion.set_tap_notification(True) logging.debug("Enabled taps sensor") if ("orient" in self.__enabledSensors): self.__dev.motion.set_orient_notification(True) logging.debug("Enabled orient sensor") if ("stepCount" in self.__enabledSensors): self.__dev.motion.set_stepcnt_notification(True) logging.debug("Enabled stepCount sensor") logging.info("Enabled notifications") @staticmethod def findMac(): while True: logger.info("Scanning for BLE devices...") for device in btle.Scanner().scan(timeout=5): logger.info(f"Found device with MAC {device.addr} with RSSI ${device.rssi}. Checking if it's a Thingy...") for (_, _, value) in device.getScanData(): if (value == "Thingy"): logger.info("... this device is a Thingy") return device.addr class NotificationDelegate(btle.DefaultDelegate): def __init__(self, setData: Callable[[str, str], None], notifySubscribers: Callable[[str], None]): self.__setData = setData self.__notifySubscribers = notifySubscribers def handleNotification(self, hnd, data): logging.debug(f"Handling notification: {repr(hnd)}") if (hnd == thingy52.e_temperature_handle): teptep = binascii.b2a_hex(data) self.__setData("temperature", Utils.hexStrToInt(teptep[:-2]) + int(teptep[-2:], 16) / 100.0) elif (hnd == thingy52.e_pressure_handle): pressure_int, pressure_dec = self.__extractPressureData(data) self.__setData("pressure", pressure_int + pressure_dec / 100.0) elif (hnd == thingy52.e_humidity_handle): teptep = binascii.b2a_hex(data) self.__setData("humidity", Utils.hexStrToInt(teptep)) elif (hnd == thingy52.e_gas_handle): eco2, tvoc = self.__extractGasData(data) logging.debug(f"Handling gas sensor update: {eco2}, {tvoc}") self.__setData("eco2", eco2) self.__setData("tvoc", tvoc) elif (hnd == thingy52.e_color_handle): teptep = binascii.b2a_hex(data) red, green, blue, clear = self.__extractColourData(data) self.__setData("colour", "0x%0.2X%0.2X%0.2X" %(red, green, blue)) elif (hnd == thingy52.ui_button_handle): teptep = binascii.b2a_hex(data) self.__setData("button", "on" if Utils.hexStrToInt(teptep) == 1 else "off") # 1 = pressed, 0 = released self.__notifySubscribers("button") elif (hnd == thingy52.m_tap_handle): direction, count = self.__extractTapData(data) match direction: case 1: tapDirection = "Bottom" case 2: tapDirection = "Left" case 3: tapDirection = "Top" case 4: tapDirection = "Right" case 5: tapDirection = "Front" case 6: tapDirection = "Back" case _: tapDirection = f"Unknown, {value}" self.__setData("tapDirection", tapDirection) self.__setData("taps", count) elif (hnd == thingy52.m_orient_handle): teptep = binascii.b2a_hex(data) value = Utils.hexStrToInt(teptep) # 1 = led top left # 2 = led top right / left side up # 3 = led bottom right/bottom up # 0 = led bottom left/ right side up match value: case 0: orientation = "LED Bottom Left" case 1: orientation = "LED Top Left" case 2: orientation = "LED Top Right" case 3: orientation = "LED Bottom Right" case _: orientation = f"Unknown, {value}" self.__setData("orient", orientation) elif (hnd == thingy52.m_stepcnt_handle): logging.info("Handling step count update") teptep = binascii.b2a_hex(data) self.__setData("stepCount", Utils.hexStrToInt(teptep)) elif (hnd == thingy52.m_rotation_handle): teptep = binascii.b2a_hex(data) self.__setData("rotation", Utils.hexStrToInt(teptep)) elif (hnd == thingy52.m_heading_handle): teptep = binascii.b2a_hex(data) self.__setData("heading", Utils.hexStrToInt(teptep)) elif (hnd == thingy52.m_gravity_handle): teptep = binascii.b2a_hex(data) self.__setData("gravity", Utils.hexStrToInt(teptep)) elif (hnd == thingy52.s_speaker_status_handle): teptep = binascii.b2a_hex(data) self.__setData("speaker", Utils.hexStrToInt(teptep)) elif (hnd == thingy52.s_microphone_handle): teptep = binascii.b2a_hex(data) else: logging.warn(f"Unknown notification: {hnd}, {data}") def __extractPressureData(self, data): """ Extract pressure data from data string. """ teptep = binascii.b2a_hex(data) pressure_int = 0 for i in range(0, 4): pressure_int += (int(teptep[i*2:(i*2)+2], 16) << 8*i) pressure_dec = int(teptep[-2:], 16) return (pressure_int, pressure_dec) def __extractGasData(self, data): """ Extract gas data from data string. """ teptep = binascii.b2a_hex(data) eco2 = int(teptep[:2], 16) + (int(teptep[2:4], 16) << 8) tvoc = int(teptep[4:6], 16) + (int(teptep[6:8], 16) << 8) return eco2, tvoc def __extractColourData(self, data): """ Extract color data from data string. """ teptep = binascii.b2a_hex(data) red = int(teptep[:2], 16) green = int(teptep[2:4], 16) blue = int(teptep[4:6], 16) clear = int(teptep[6:8], 16) return red, green, blue, clear def __extractTapData(self, data): """ Extract tap data from data string. """ teptep = binascii.b2a_hex(data) direction = int(teptep[0:2]) count = int(teptep[2:4]) return (direction, count)