from time import sleep import logging from collections.abc import Callable logger = logging.getLogger(__name__) class Utils: @staticmethod def getReconnector( reconnect: Callable[[], None], description: str, minReconnectDelay: int, maxReconnectDelay: int, reconnectAttempts: int, reconnectDelayMultiplier: int, disconnectCallback: Callable[[], None] ): def fn(): logger.warn(f"Disconnected from {description}, trying to reconnect") reconnectCount = 0 reconnectDelay = minReconnectDelay while (reconnectAttempts < 0 or reconnectCount < reconnectAttempts): sleep(reconnectDelay) try: reconnect() logging.info(f"Reconnected to {description}") return except: reconnectDelay = min(minReconnectDelay, reconnectDelay * reconnectDelayMultiplier) reconnectCount += 1 logger.error(f"Unable to reconnect to {description} after {reconnectCount} attempts, exiting") disconnectCallback() return fn @staticmethod def hexStrToInt(s): # hex string to int i = int(s, 16) if i >= 2**7: i -= 2**8 return i