diff --git a/.github/workflows/flake8.yaml b/.github/workflows/flake8.yaml new file mode 100644 index 00000000..2fd67945 --- /dev/null +++ b/.github/workflows/flake8.yaml @@ -0,0 +1,32 @@ +name: Python flake8 + +on: + push: + branches: [ main, master, dev, development ] + pull_request: + branches: [ main, master, dev, development ] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 wemake-python-styleguide + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. ignore magic numbers and use double quotes and ignore numbers with zeroes before them. + # and ignore lowercase hex numbers and ignore isort incorrect imports + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=90 --ignore=WPS432,WPS339,WPS341,I --inline-quotes double --statistics diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 604ed88a..984e5191 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -4,7 +4,7 @@ from typing import Generator, List, Union, Tuple from .alarm import S1C -from .climate import hysen +from .climate import hysen, sq1 from .cover import dooya from .device import device, scan from .exceptions import exception @@ -20,88 +20,88 @@ 0x2716: (sp2, "NEO PRO", "Ankuoo"), 0x2717: (sp2, "NEO", "Ankuoo"), 0x2719: (sp2, "SP2-compatible", "Honeywell"), - 0x271a: (sp2, "SP2-compatible", "Honeywell"), + 0x271A: (sp2, "SP2-compatible", "Honeywell"), 0x2720: (sp2, "SP mini", "Broadlink"), 0x2728: (sp2, "SP2-compatible", "URANT"), 0x2733: (sp2, "SP3", "Broadlink"), 0x2736: (sp2, "SP mini+", "Broadlink"), - 0x273e: (sp2, "SP mini", "Broadlink"), + 0x273E: (sp2, "SP mini", "Broadlink"), 0x7530: (sp2, "SP2", "Broadlink (OEM)"), 0x7539: (sp2, "SP2-IL", "Broadlink (OEM)"), - 0x753e: (sp2, "SP mini 3", "Broadlink"), + 0x753E: (sp2, "SP mini 3", "Broadlink"), 0x7540: (sp2, "MP2", "Broadlink"), - 0X7544: (sp2, "SP2-CL", "Broadlink"), + 0x7544: (sp2, "SP2-CL", "Broadlink"), 0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"), 0x7547: (sp2, "SC1", "Broadlink"), 0x7918: (sp2, "SP2", "Broadlink (OEM)"), 0x7919: (sp2, "SP2-compatible", "Honeywell"), - 0x791a: (sp2, "SP2-compatible", "Honeywell"), - 0x7d00: (sp2, "SP3-EU", "Broadlink (OEM)"), - 0x7d0d: (sp2, "SP mini 3", "Broadlink (OEM)"), + 0x791A: (sp2, "SP2-compatible", "Honeywell"), + 0x7D00: (sp2, "SP3-EU", "Broadlink (OEM)"), + 0x7D0D: (sp2, "SP mini 3", "Broadlink (OEM)"), 0x9479: (sp2, "SP3S-US", "Broadlink"), - 0x947a: (sp2, "SP3S-EU", "Broadlink"), - 0x756c: (sp4, "SP4M", "Broadlink"), + 0x947A: (sp2, "SP3S-EU", "Broadlink"), + 0x756C: (sp4, "SP4M", "Broadlink"), 0x7579: (sp4, "SP4L-EU", "Broadlink"), 0x7583: (sp4, "SP mini 3", "Broadlink"), - 0x7d11: (sp4, "SP mini 3", "Broadlink"), - 0x648b: (sp4b, "SP4M-US", "Broadlink"), + 0x7D11: (sp4, "SP mini 3", "Broadlink"), + 0x648B: (sp4b, "SP4M-US", "Broadlink"), 0x2712: (rm, "RM pro/pro+", "Broadlink"), - 0x272a: (rm, "RM pro", "Broadlink"), + 0x272A: (rm, "RM pro", "Broadlink"), 0x2737: (rm, "RM mini 3", "Broadlink"), - 0x273d: (rm, "RM pro", "Broadlink"), - 0x277c: (rm, "RM home", "Broadlink"), + 0x273D: (rm, "RM pro", "Broadlink"), + 0x277C: (rm, "RM home", "Broadlink"), 0x2783: (rm, "RM home", "Broadlink"), 0x2787: (rm, "RM pro", "Broadlink"), - 0x278b: (rm, "RM plus", "Broadlink"), - 0x278f: (rm, "RM mini", "Broadlink"), + 0x278B: (rm, "RM plus", "Broadlink"), + 0x278F: (rm, "RM mini", "Broadlink"), 0x2797: (rm, "RM pro+", "Broadlink"), - 0x279d: (rm, "RM pro+", "Broadlink"), - 0x27a1: (rm, "RM plus", "Broadlink"), - 0x27a6: (rm, "RM plus", "Broadlink"), - 0x27a9: (rm, "RM pro+", "Broadlink"), - 0x27c2: (rm, "RM mini 3", "Broadlink"), - 0x27c3: (rm, "RM pro+", "Broadlink"), - 0x27c7: (rm, "RM mini 3", "Broadlink"), - 0x27cc: (rm, "RM mini 3", "Broadlink"), - 0x27cd: (rm, "RM mini 3", "Broadlink"), - 0x27d0: (rm, "RM mini 3", "Broadlink"), - 0x27d1: (rm, "RM mini 3", "Broadlink"), - 0x27de: (rm, "RM mini 3", "Broadlink"), - 0x51da: (rm4, "RM4 mini", "Broadlink"), - 0x5f36: (rm4, "RM mini 3", "Broadlink"), + 0x279D: (rm, "RM pro+", "Broadlink"), + 0x27A1: (rm, "RM plus", "Broadlink"), + 0x27A6: (rm, "RM plus", "Broadlink"), + 0x27A9: (rm, "RM pro+", "Broadlink"), + 0x27C2: (rm, "RM mini 3", "Broadlink"), + 0x27C3: (rm, "RM pro+", "Broadlink"), + 0x27C7: (rm, "RM mini 3", "Broadlink"), + 0x27CC: (rm, "RM mini 3", "Broadlink"), + 0x27CD: (rm, "RM mini 3", "Broadlink"), + 0x27D0: (rm, "RM mini 3", "Broadlink"), + 0x27D1: (rm, "RM mini 3", "Broadlink"), + 0x27DE: (rm, "RM mini 3", "Broadlink"), + 0x51DA: (rm4, "RM4 mini", "Broadlink"), + 0x5F36: (rm4, "RM mini 3", "Broadlink"), 0x6026: (rm4, "RM4 pro", "Broadlink"), 0x6070: (rm4, "RM4C mini", "Broadlink"), - 0x610e: (rm4, "RM4 mini", "Broadlink"), - 0x610f: (rm4, "RM4C mini", "Broadlink"), - 0x61a2: (rm4, "RM4 pro", "Broadlink"), - 0x62bc: (rm4, "RM4 mini", "Broadlink"), - 0x62be: (rm4, "RM4C mini", "Broadlink"), - 0x648d: (rm4, "RM4 mini", "Broadlink"), - 0x649b: (rm4, "RM4 pro", "Broadlink"), - 0x653a: (rm4, "RM4 mini", "Broadlink"), + 0x610E: (rm4, "RM4 mini", "Broadlink"), + 0x610F: (rm4, "RM4C mini", "Broadlink"), + 0x61A2: (rm4, "RM4 pro", "Broadlink"), + 0x62BC: (rm4, "RM4 mini", "Broadlink"), + 0x62BE: (rm4, "RM4C mini", "Broadlink"), + 0x648D: (rm4, "RM4 mini", "Broadlink"), + 0x649B: (rm4, "RM4 pro", "Broadlink"), + 0x653A: (rm4, "RM4 mini", "Broadlink"), 0x2714: (a1, "e-Sensor", "Broadlink"), - 0x4eb5: (mp1, "MP1-1K4S", "Broadlink"), - 0x4ef7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), - 0x4f1b: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), - 0x4f65: (mp1, "MP1-1K3S2U", "Broadlink"), + 0x4EB5: (mp1, "MP1-1K4S", "Broadlink"), + 0x4EF7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), + 0x4F1B: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), + 0x4F65: (mp1, "MP1-1K3S2U", "Broadlink"), 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), - 0x504e: (lb1, "LB1", "Broadlink"), - 0x60c7: (lb1, "LB1", "Broadlink"), - 0x60c8: (lb1, "LB1", "Broadlink"), + 0x504E: (lb1, "LB1", "Broadlink"), + 0x60C7: (lb1, "LB1", "Broadlink"), + 0x60C8: (lb1, "LB1", "Broadlink"), 0x6112: (lb1, "LB1", "Broadlink"), 0x2722: (S1C, "S2KIT", "Broadlink"), - 0x4ead: (hysen, "HY02B05H", "Hysen"), - 0x4e4d: (dooya, "DT360E-45/20", "Dooya"), - 0x51e3: (bg1, "BG800/BG900", "BG Electrical"), + 0X4E2A: (sq1, "SQ", "Tornado"), + 0x4EAD: (hysen, "HY02B05H", "Hysen"), + 0x4E4D: (dooya, "DT360E-45/20", "Dooya"), + 0x51E3: (bg1, "BG800/BG900", "BG Electrical"), } - def gendevice( - dev_type: int, - host: Tuple[str, int], - mac: Union[bytes, str], - name: str = None, - is_locked: bool = None, + dev_type: int, + host: Tuple[str, int], + mac: Union[bytes, str], + name: str = None, + is_locked: bool = None, ) -> device: """Generate a device.""" try: @@ -122,10 +122,10 @@ def gendevice( def hello( - host: str, - port: int = 80, - timeout: int = 10, - local_ip_address: str = None, + host: str, + port: int = 80, + timeout: int = 10, + local_ip_address: str = None, ) -> device: """Direct device discovery. @@ -138,31 +138,27 @@ def hello( def discover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> List[device]: """Discover devices connected to the local network.""" - responses = scan( - timeout, local_ip_address, discover_ip_address, discover_ip_port - ) + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) return [gendevice(*resp) for resp in responses] def xdiscover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> Generator[device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. """ - responses = scan( - timeout, local_ip_address, discover_ip_address, discover_ip_port - ) + responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) for resp in responses: yield gendevice(*resp) @@ -191,13 +187,12 @@ def setup(ssid: str, password: str, security_mode: int) -> None: payload[0x85] = pass_length # Character length of password payload[0x86] = security_mode # Type of encryption - checksum = sum(payload, 0xbeaf) & 0xffff - payload[0x20] = checksum & 0xff # Checksum 1 position + checksum = sum(payload, 0xBEAF) & 0xFFFF + payload[0x20] = checksum & 0xFF # Checksum 1 position payload[0x21] = checksum >> 8 # Checksum 2 position - sock = socket.socket(socket.AF_INET, # Internet - socket.SOCK_DGRAM) # UDP + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, ('255.255.255.255', 80)) + sock.sendto(payload, ("255.255.255.255", 80)) sock.close() diff --git a/broadlink/alarm.py b/broadlink/alarm.py index faded9d4..e73b8fad 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -7,21 +7,21 @@ class S1C(device): """Controls a Broadlink S1C.""" _SENSORS_TYPES = { - 0x31: 'Door Sensor', # 49 as hex - 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse - 0x21: 'Motion Sensor' # 33 as hex + 0x31: "Door Sensor", # 49 as hex + 0x91: "Key Fob", # 145 as hex, as serial on fob corpse + 0x21: "Motion Sensor", # 33 as hex } def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) - self.type = 'S1C' + self.type = "S1C" def get_sensors_status(self) -> dict: """Return the state of the sensors.""" packet = bytearray(16) packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if not payload: @@ -29,20 +29,20 @@ def get_sensors_status(self) -> dict: count = payload[0x4] sensor_data = payload[0x6:] sensors = [ - bytearray(sensor_data[i * 83:(i + 1) * 83]) + bytearray(sensor_data[i * 83 : (i + 1) * 83]) for i in range(len(sensor_data) // 83) ] return { - 'count': count, - 'sensors': [ + "count": count, + "sensors": [ { - 'status': sensor[0], - 'name': sensor[4:26].decode().strip('\x00'), - 'type': self._SENSORS_TYPES.get(sensor[3], 'Unknown'), - 'order': sensor[1], - 'serial': sensor[26:30].hex(), + "status": sensor[0], + "name": sensor[4:26].decode().strip("\x00"), + "type": self._SENSORS_TYPES.get(sensor[3], "Unknown"), + "order": sensor[1], + "serial": sensor[26:30].hex(), } for sensor in sensors if any(sensor[26:30]) - ] + ], } diff --git a/broadlink/climate.py b/broadlink/climate.py index f0c337e5..5a57ba0d 100644 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,5 +1,8 @@ """Support for climate control.""" from typing import List +import logging +from enum import IntEnum, unique +import struct from .device import device from .exceptions import check_error @@ -33,19 +36,22 @@ def send_request(self, input_payload: bytes) -> bytes: request_payload.append((crc >> 8) & 0xFF) # send to device - response = self.send_packet(0x6a, request_payload) + response = self.send_packet(0x6A, request_payload) check_error(response[0x22:0x24]) response_payload = self.decrypt(response[0x38:]) # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) response_payload_len = response_payload[0] if response_payload_len + 2 > len(response_payload): - raise ValueError('hysen_response_error', 'first byte of response is not length') + raise ValueError( + "hysen_response_error", "first byte of response is not length" + ) crc = calculate_crc16(response_payload[2:response_payload_len]) if (response_payload[response_payload_len] == crc & 0xFF) and ( - response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF): + response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF + ): return response_payload[2:response_payload_len] - raise ValueError('hysen_response_error', 'CRC check on response failed') + raise ValueError("hysen_response_error", "CRC check on response failed") def get_temp(self) -> int: """Return the room temperature in degrees celsius.""" @@ -64,43 +70,53 @@ def get_full_status(self) -> dict: """ payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) data = {} - data['remote_lock'] = payload[3] & 1 - data['power'] = payload[4] & 1 - data['active'] = (payload[4] >> 4) & 1 - data['temp_manual'] = (payload[4] >> 6) & 1 - data['room_temp'] = (payload[5] & 255) / 2.0 - data['thermostat_temp'] = (payload[6] & 255) / 2.0 - data['auto_mode'] = payload[7] & 15 - data['loop_mode'] = (payload[7] >> 4) & 15 - data['sensor'] = payload[8] - data['osv'] = payload[9] - data['dif'] = payload[10] - data['svh'] = payload[11] - data['svl'] = payload[12] - data['room_temp_adj'] = ((payload[13] << 8) + payload[14]) / 2.0 - if data['room_temp_adj'] > 32767: - data['room_temp_adj'] = 32767 - data['room_temp_adj'] - data['fre'] = payload[15] - data['poweron'] = payload[16] - data['unknown'] = payload[17] - data['external_temp'] = (payload[18] & 255) / 2.0 - data['hour'] = payload[19] - data['min'] = payload[20] - data['sec'] = payload[21] - data['dayofweek'] = payload[22] + data["remote_lock"] = payload[3] & 1 + data["power"] = payload[4] & 1 + data["active"] = (payload[4] >> 4) & 1 + data["temp_manual"] = (payload[4] >> 6) & 1 + data["room_temp"] = (payload[5] & 255) / 2.0 + data["thermostat_temp"] = (payload[6] & 255) / 2.0 + data["auto_mode"] = payload[7] & 15 + data["loop_mode"] = (payload[7] >> 4) & 15 + data["sensor"] = payload[8] + data["osv"] = payload[9] + data["dif"] = payload[10] + data["svh"] = payload[11] + data["svl"] = payload[12] + data["room_temp_adj"] = ((payload[13] << 8) + payload[14]) / 2.0 + if data["room_temp_adj"] > 32767: + data["room_temp_adj"] = 32767 - data["room_temp_adj"] + data["fre"] = payload[15] + data["poweron"] = payload[16] + data["unknown"] = payload[17] + data["external_temp"] = (payload[18] & 255) / 2.0 + data["hour"] = payload[19] + data["min"] = payload[20] + data["sec"] = payload[21] + data["dayofweek"] = payload[22] weekday = [] for i in range(0, 6): weekday.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) - data['weekday'] = weekday + data["weekday"] = weekday weekend = [] for i in range(6, 8): weekend.append( - {'start_hour': payload[2 * i + 23], 'start_minute': payload[2 * i + 24], 'temp': payload[i + 39] / 2.0}) + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) - data['weekend'] = weekend + data["weekend"] = weekend return data # Change controller mode @@ -140,8 +156,27 @@ def set_advanced( poweron: int, ) -> None: """Set advanced options.""" - input_payload = bytearray([0x01, 0x10, 0x00, 0x02, 0x00, 0x05, 0x0a, loop_mode, sensor, osv, dif, svh, svl, - (int(adj * 2) >> 8 & 0xff), (int(adj * 2) & 0xff), fre, poweron]) + input_payload = bytearray( + [ + 0x01, + 0x10, + 0x00, + 0x02, + 0x00, + 0x05, + 0x0A, + loop_mode, + sensor, + osv, + dif, + svh, + svl, + (int(adj * 2) >> 8 & 0xFF), + (int(adj * 2) & 0xFF), + fre, + poweron, + ] + ) self.send_request(input_payload) # For backwards compatibility only. Prefer calling set_mode directly. @@ -169,7 +204,11 @@ def set_power(self, power: int = 1, remote_lock: int = 0) -> None: # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" - self.send_request(bytearray([0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day])) + self.send_request( + bytearray( + [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + ) + ) # Set timer schedule # Format is the same as you get from get_full_status. @@ -180,25 +219,294 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" # Begin with some magic values ... - input_payload = bytearray([0x01, 0x10, 0x00, 0x0a, 0x00, 0x0c, 0x18]) + input_payload = bytearray([0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]) # Now simply append times/temps # weekday times for i in range(0, 6): - input_payload.append(weekday[i]['start_hour']) - input_payload.append(weekday[i]['start_minute']) + input_payload.append(weekday[i]["start_hour"]) + input_payload.append(weekday[i]["start_minute"]) # weekend times for i in range(0, 2): - input_payload.append(weekend[i]['start_hour']) - input_payload.append(weekend[i]['start_minute']) + input_payload.append(weekend[i]["start_hour"]) + input_payload.append(weekend[i]["start_minute"]) # weekday temperatures for i in range(0, 6): - input_payload.append(int(weekday[i]['temp'] * 2)) + input_payload.append(int(weekday[i]["temp"] * 2)) # weekend temperatures for i in range(0, 2): - input_payload.append(int(weekend[i]['temp'] * 2)) + input_payload.append(int(weekend[i]["temp"] * 2)) self.send_request(input_payload) + + +class sq1(device): + """Controls Tornado SMART X SQ series air conditioners.""" + + @unique + class Mode(IntEnum): + AUTO = 0 + COOLING = 0x20 + DRYING = 0x40 + HEATING = 0x80 + FAN = 0xc0 + + @unique + class Speed(IntEnum): + HIGH = 0x20 + MID = 0x40 + LOW = 0x60 + AUTO = 0xa0 + + @unique + class SwingH(IntEnum): + ON = 0b000, + OFF = 0b111 + + @unique + class SwingV(IntEnum): + ON = 0b000, + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 0b111 + + def __init__(self, *args, **kwargs): + device.__init__(self, *args, **kwargs) + self.type = "Tornado SQ air conditioner" + + def _decode(self, response) -> bytes: + # RESPONSE_PREFIX = bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(bytes(response[0x38:])) + + length = int.from_bytes(payload[:2], 'little') + checksum = self._calculate_checksum( + payload[2:length]).to_bytes(2, 'little') + if checksum == payload[length:length+2]: + logging.debug("Checksum incorrect (calculated %s actual %s).", + checksum.hex(), payload[length:length+2].hex()) + + return payload + + def _calculate_checksum(self, payload: bytes) -> int: + """Calculate checksum of given array, + by adding little endian words and subtracting from 0xffff. + + The first two bytes of most packets in the class are the length of the + payload and should be cropped out when using this function. + + Args: + payload (bytes): the payload + """ + s = sum([v if i % 2 == 0 else v << 8 for i, v in enumerate(payload)]) + # trim the overflow and add it to smallest bit + s = (s & 0xffff) + (s >> 16) + return (0xffff - s) + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + payload = struct.pack("HHHH", 0x00BB, 0x8006, 0x0000, len(data)) + data + logging.debug("Payload:\n%s", payload.hex(' ')) + checksum = self._calculate_checksum(payload).to_bytes(2, 'little') + return (len(payload) + 2).to_bytes(2, 'little') + payload + checksum + + def _send_command(self, command: int, data: bytes = b'') -> bytes: + """Send a command to the unit. + + Known commands: + - Get AC info: 0x0121 + - Get states: 0x0111 + - Get sleep info: 0x0141 + """ + packet = self._encode(command.to_bytes(2, "little") + data) + logging.debug("Payload:\n%s", packet.hex(' ')) + response = self.send_packet(0x6a, packet) + check_error(response[0x22:0x24]) + return self._decode(response) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16> 3) + + (0.0 if (payload[0xe] & 0b10000000) == 0 else 0.5)) # noqa E501 + + data['swing_h'] = self.SwingH((payload[0x0d] & 0b11100000) >> 5) + data['swing_v'] = self.SwingV(payload[0x0c] & 0b111) + + data['mode'] = self.Mode(payload[0x11] & ~ 0b111) + + data['speed'] = self.Speed(payload[0x0f]) + + data['mute'] = bool(payload[0x10] == 0x80) + data['turbo'] = bool(payload[0x10] == 0x40) + + data['sleep'] = bool(payload[0x11] & 0b100) + + data['health'] = bool(payload[0x14] & 0b10) + data['clean'] = bool(payload[0x14] & 0b100) + + data['display'] = bool(payload[0x16] & 0b10000) + data['mildew'] = bool(payload[0x16] & 0b1000) + + logging.debug("Data: %s", data) + + return data + + def get_ac_info(self) -> dict: + """Returns dictionary with AC info. + + Returns: + dict: + state (bool): power + ambient_temp (float): ambient temperature + """ + payload = self._send_command(0x121) + if (len(payload) != 48): + raise ValueError(f"unexpected payload size: {len(payload)}") + + logging.debug("Received payload:\n%s", payload.hex(' ')) + + # Length is 34 (0x22), the next 11 bytes are + # the same: bb 00 07 00 00 00 18 00 01 21 c0, + # bytes 0x23,0x24 are the checksum. + data = {} + data['state'] = payload[0x0d] & 0b1 == 0b1 + + ambient_temp = payload[0x11] & 0b00011111 + if ambient_temp: + data['ambient_temp'] = (ambient_temp + + float(payload[0x21] & 0b00011111) / 10.0) + + logging.debug("Data: %s", data) + return data + + def set_state(self, state: dict) -> bool: + """Set parameters of unit. + + Args: + state (dict): if any are missing the current value will be retrived + power (bool): + target_temp (float): temperature set point 16 0: + raise ValueError(f"unknown argument(s) {unknown_keys}") + + missing_keys = [key for key in keys if key not in state] + if len(missing_keys) > 0: + try: + received_state = self.get_state() + except RuntimeError as e: + if "unexpected payload size: 48" in str(e): + # Occasionally a 48 byte payload gets mixed in, + # a retry should suffice. + received_state = self.get_state() + else: + raise e + logging.debug("Raw state %s", state) + received_state.update(state) + state = received_state + logging.debug("Filled state %s", state) + + state['target_temp'] = round(state['target_temp'] * 2) / 2 + if not (16 <= state['target_temp'] <= 32): + raise ValueError(f"target_temp out of range: {state['target_temp']}") # noqa E501 + + # Creating a new instance verifies the type + swing_R = self.SwingH(state['swing_h']) + swing_L = self.SwingV(state['swing_v']) + + mode = self.Mode(state['mode']) + + if state['mute'] and state['turbo']: + raise ValueError("mute and turbo can't be on at once") + elif state['mute']: + speed_R = 0x80 + if state['mode'] != 'fan': + raise ValueError("mute is only available in fan mode") + state['speed'] = self.Speed.LOW + elif state['turbo']: + speed_R = 0x40 + if state['mode'] not in ('cooling', 'heating'): + raise ValueError("turbo is only available in cooling/heating") + state['speed'] = self.Speed.HIGH + else: + speed_R = 0x00 + + speed_L = self.Speed(state['speed']) + + data = bytes( + [ + (int(state['target_temp']) - 8 << 3) | swing_L, + (swing_R << 5) | CMND_0B_RMASK, + ((state['target_temp'] % 1 == 0.5) << 7) | CMND_0C_RMASK, + speed_L, + speed_R, + mode | (state['sleep'] << 2), + 0x00, + 0x00, + (state['power'] << 5 | state['clean'] << 2 | 0b11 if state['health'] else 0b00), + 0x00, + state['display'] << 4 | state['mildew'] << 3, + 0x00, + CMND_16 + ] + ) + logging.debug("Constructed payload data:\n%s", data.hex(' ')) + + response_payload = self._send_command(0x0101, data) + logging.debug("Response payload:\n%s", response_payload.hex(' ')) + # Response payloads are 16 bytes long, + # Bytes 0d-0e are the checksum of the sent command. diff --git a/broadlink/cover.py b/broadlink/cover.py index 236e747c..2691fe97 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -17,12 +17,12 @@ def _send(self, magic1: int, magic2: int) -> int: """Send a packet to the device.""" packet = bytearray(16) packet[0] = 0x09 - packet[2] = 0xbb + packet[2] = 0xBB packet[3] = magic1 packet[4] = magic2 - packet[9] = 0xfa + packet[9] = 0xFA packet[10] = 0x44 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[4] @@ -41,7 +41,7 @@ def stop(self) -> int: def get_percentage(self) -> int: """Return the position of the curtain.""" - return self._send(0x06, 0x5d) + return self._send(0x06, 0x5D) def set_percentage_and_wait(self, new_percentage: int) -> None: """Set the position of the curtain.""" diff --git a/broadlink/device.py b/broadlink/device.py index 9392024f..e040de79 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -15,10 +15,10 @@ def scan( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = '255.255.255.255', - discover_ip_port: int = 80, + timeout: int = 10, + local_ip_address: str = None, + discover_ip_address: str = "255.255.255.255", + discover_ip_port: int = 80, ) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -36,38 +36,38 @@ def scan( timezone = int(time.timezone / -3600) if timezone < 0: - packet[0x08] = 0xff + timezone - 1 - packet[0x09] = 0xff - packet[0x0a] = 0xff - packet[0x0b] = 0xff + packet[0x08] = 0xFF + timezone - 1 + packet[0x09] = 0xFF + packet[0x0A] = 0xFF + packet[0x0B] = 0xFF else: packet[0x08] = timezone packet[0x09] = 0 - packet[0x0a] = 0 - packet[0x0b] = 0 + packet[0x0A] = 0 + packet[0x0B] = 0 year = datetime.now().year - packet[0x0c] = year & 0xff - packet[0x0d] = year >> 8 - packet[0x0e] = datetime.now().minute - packet[0x0f] = datetime.now().hour + packet[0x0C] = year & 0xFF + packet[0x0D] = year >> 8 + packet[0x0E] = datetime.now().minute + packet[0x0F] = datetime.now().hour subyear = str(year)[2:] packet[0x10] = int(subyear) packet[0x11] = datetime.now().isoweekday() packet[0x12] = datetime.now().day packet[0x13] = datetime.now().month - address = local_ip_address.split('.') + address = local_ip_address.split(".") packet[0x18] = int(address[3]) packet[0x19] = int(address[2]) - packet[0x1a] = int(address[1]) - packet[0x1b] = int(address[0]) - packet[0x1c] = port & 0xff - packet[0x1d] = port >> 8 + packet[0x1A] = int(address[1]) + packet[0x1B] = int(address[0]) + packet[0x1C] = port & 0xFF + packet[0x1D] = port >> 8 packet[0x26] = 6 - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 starttime = time.time() @@ -85,12 +85,12 @@ def scan( break devtype = response[0x34] | response[0x35] << 8 - mac = bytes(reversed(response[0x3a:0x40])) + mac = bytes(reversed(response[0x3A:0x40])) if (host, mac, devtype) in discovered: continue discovered.append((host, mac, devtype)) - name = response[0x40:].split(b'\x00')[0].decode('utf-8') + name = response[0x40:].split(b"\x00")[0].decode("utf-8") is_locked = bool(response[-1]) yield devtype, host, mac, name, is_locked finally: @@ -101,33 +101,33 @@ class device: """Controls a Broadlink device.""" def __init__( - self, - host: Tuple[str, int], - mac: Union[bytes, str], - devtype: int, - timeout: int = 10, - name: str = None, - model: str = None, - manufacturer: str = None, - is_locked: bool = None, + self, + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = 10, + name: str = None, + model: str = None, + manufacturer: str = None, + is_locked: bool = None, ) -> None: """Initialize the controller.""" self.host = host self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac - self.devtype = devtype if devtype is not None else 0x272a + self.devtype = devtype if devtype is not None else 0x272A self.timeout = timeout self.name = name self.model = model self.manufacturer = manufacturer self.is_locked = is_locked - self.count = random.randrange(0xffff) - self.iv = bytes.fromhex('562e17996d093d28ddb3ba695a2e6f58') + self.count = random.randrange(0xFFFF) + self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58") self.id = bytes(4) self.type = "Unknown" self.lock = threading.Lock() self.aes = None - key = bytes.fromhex('097628343fe99e23765c1513accf8b02') + key = bytes.fromhex("097628343fe99e23765c1513accf8b02") self.update_aes(key) def __repr__(self): @@ -138,7 +138,7 @@ def __repr__(self): hex(self.devtype), self.host[0], self.host[1], - ':'.join(format(x, '02x') for x in self.mac), + ":".join(format(x, "02x") for x in self.mac), self.name, "Locked" if self.is_locked else "Unlocked", ) @@ -175,24 +175,24 @@ def auth(self) -> bool: payload[0x07] = 0x31 payload[0x08] = 0x31 payload[0x09] = 0x31 - payload[0x0a] = 0x31 - payload[0x0b] = 0x31 - payload[0x0c] = 0x31 - payload[0x0d] = 0x31 - payload[0x0e] = 0x31 - payload[0x0f] = 0x31 + payload[0x0A] = 0x31 + payload[0x0B] = 0x31 + payload[0x0C] = 0x31 + payload[0x0D] = 0x31 + payload[0x0E] = 0x31 + payload[0x0F] = 0x31 payload[0x10] = 0x31 payload[0x11] = 0x31 payload[0x12] = 0x31 - payload[0x1e] = 0x01 - payload[0x2d] = 0x01 - payload[0x30] = ord('T') - payload[0x31] = ord('e') - payload[0x32] = ord('s') - payload[0x33] = ord('t') - payload[0x34] = ord(' ') - payload[0x35] = ord(' ') - payload[0x36] = ord('1') + payload[0x1E] = 0x01 + payload[0x2D] = 0x01 + payload[0x30] = ord("T") + payload[0x31] = ord("e") + payload[0x32] = ord("s") + payload[0x33] = ord("t") + payload[0x34] = ord(" ") + payload[0x35] = ord(" ") + payload[0x36] = ord("1") response = self.send_packet(0x65, payload) check_error(response[0x22:0x24]) @@ -233,7 +233,7 @@ def hello(self, local_ip_address=None) -> bool: def get_fwversion(self) -> int: """Get firmware version.""" packet = bytearray([0x68]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[0x4] | payload[0x5] << 8 @@ -241,20 +241,20 @@ def get_fwversion(self) -> int: def set_name(self, name: str) -> None: """Set device name.""" packet = bytearray(4) - packet += name.encode('utf-8') + packet += name.encode("utf-8") packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(self.is_locked) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) self.name = name def set_lock(self, state: bool) -> None: """Lock/unlock the device.""" packet = bytearray(4) - packet += self.name.encode('utf-8') + packet += self.name.encode("utf-8") packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(state) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) self.is_locked = bool(state) @@ -264,27 +264,27 @@ def get_type(self) -> str: def send_packet(self, command: int, payload: bytes) -> bytes: """Send a packet to the device.""" - self.count = (self.count + 1) & 0xffff + self.count = (self.count + 1) & 0xFFFF packet = bytearray(0x38) - packet[0x00] = 0x5a - packet[0x01] = 0xa5 - packet[0x02] = 0xaa + packet[0x00] = 0x5A + packet[0x01] = 0xA5 + packet[0x02] = 0xAA packet[0x03] = 0x55 - packet[0x04] = 0x5a - packet[0x05] = 0xa5 - packet[0x06] = 0xaa + packet[0x04] = 0x5A + packet[0x05] = 0xA5 + packet[0x06] = 0xAA packet[0x07] = 0x55 - packet[0x24] = self.devtype & 0xff + packet[0x24] = self.devtype & 0xFF packet[0x25] = self.devtype >> 8 packet[0x26] = command - packet[0x28] = self.count & 0xff + packet[0x28] = self.count & 0xFF packet[0x29] = self.count >> 8 - packet[0x2a] = self.mac[5] - packet[0x2b] = self.mac[4] - packet[0x2c] = self.mac[3] - packet[0x2d] = self.mac[2] - packet[0x2e] = self.mac[1] - packet[0x2f] = self.mac[0] + packet[0x2A] = self.mac[5] + packet[0x2B] = self.mac[4] + packet[0x2C] = self.mac[3] + packet[0x2D] = self.mac[2] + packet[0x2E] = self.mac[1] + packet[0x2F] = self.mac[0] packet[0x30] = self.id[3] packet[0x31] = self.id[2] packet[0x32] = self.id[1] @@ -296,16 +296,16 @@ def send_packet(self, command: int, payload: bytes) -> bytes: payload = bytearray(payload) payload += bytearray(padding) - checksum = sum(payload, 0xbeaf) & 0xffff - packet[0x34] = checksum & 0xff + checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34] = checksum & 0xFF packet[0x35] = checksum >> 8 payload = self.encrypt(payload) for i in range(len(payload)): packet.append(payload[i]) - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x20] = checksum & 0xff + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20] = checksum & 0xFF packet[0x21] = checksum >> 8 start_time = time.time() @@ -329,7 +329,7 @@ def send_packet(self, command: int, payload: bytes) -> bytes: raise exception(-4007) # Length error. checksum = resp[0x20] | (resp[0x21] << 8) - if sum(resp, 0xbeaf) - sum(resp[0x20:0x22]) & 0xffff != checksum: + if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum: raise exception(-4008) # Checksum error. return resp diff --git a/broadlink/light.py b/broadlink/light.py index cffc77d5..adadfab5 100644 --- a/broadlink/light.py +++ b/broadlink/light.py @@ -11,14 +11,14 @@ class lb1(device): state_dict = [] effect_map_dict = { - 'lovely color': 0, - 'flashlight': 1, - 'lightning': 2, - 'color fading': 3, - 'color breathing': 4, - 'multicolor breathing': 5, - 'color jumping': 6, - 'multicolor jumping': 7, + "lovely color": 0, + "flashlight": 1, + "lightning": 2, + "color fading": 3, + "color breathing": 4, + "multicolor breathing": 5, + "color jumping": 6, + "multicolor jumping": 7, } def __init__(self, *args, **kwargs) -> None: @@ -26,36 +26,38 @@ def __init__(self, *args, **kwargs) -> None: device.__init__(self, *args, **kwargs) self.type = "SmartBulb" - def send_command(self, command: str, type: str = 'set') -> None: + def send_command(self, command: str, type: str = "set") -> None: """Send a command to the device.""" - packet = bytearray(16+(int(len(command)/16) + 1)*16) - packet[0x00] = 0x0c + len(command) & 0xff - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set - packet[0x09] = 0x0b - packet[0x0a] = len(command) - packet[0x0e:] = map(ord, command) + packet = bytearray(16 + (int(len(command) / 16) + 1) * 16) + packet[0x00] = 0x0C + len(command) & 0xFF + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set + packet[0x09] = 0x0B + packet[0x0A] = len(command) + packet[0x0E:] = map(ord, command) - checksum = sum(packet, 0xbeaf) & 0xffff - packet[0x06] = checksum & 0xff # Checksum 1 position + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF # Checksum 1 position packet[0x07] = checksum >> 8 # Checksum 2 position - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x36:0x38]) payload = self.decrypt(response[0x38:]) - responseLength = int(payload[0x0a]) | (int(payload[0x0b]) << 8) + responseLength = int(payload[0x0A]) | (int(payload[0x0B]) << 8) if responseLength > 0: - self.state_dict = json.loads(payload[0x0e:0x0e+responseLength]) + self.state_dict = json.loads(payload[0x0E : 0x0E + responseLength]) def set_json(self, jsonstr: str) -> str: """Send a command to the device and return state.""" reconvert = json.loads(jsonstr) - if 'bulb_sceneidx' in reconvert.keys(): - reconvert['bulb_sceneidx'] = self.effect_map_dict.get(reconvert['bulb_sceneidx'], 255) + if "bulb_sceneidx" in reconvert.keys(): + reconvert["bulb_sceneidx"] = self.effect_map_dict.get( + reconvert["bulb_sceneidx"], 255 + ) self.send_command(json.dumps(reconvert)) return json.dumps(self.state_dict) diff --git a/broadlink/remote.py b/broadlink/remote.py index 4a427f69..b6528d68 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -17,45 +17,45 @@ def check_data(self) -> bytes: """Return the last captured code.""" packet = bytearray(self._request_header) packet.append(0x04) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return payload[len(self._request_header) + 4:] + return payload[len(self._request_header) + 4 :] def send_data(self, data: bytes) -> None: """Send a code to the device.""" packet = bytearray(self._code_sending_header) packet += bytearray([0x02, 0x00, 0x00, 0x00]) packet += data - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def enter_learning(self) -> None: """Enter infrared learning mode.""" packet = bytearray(self._request_header) packet.append(0x03) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def sweep_frequency(self) -> None: """Sweep frequency.""" packet = bytearray(self._request_header) packet.append(0x19) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def cancel_sweep_frequency(self) -> None: """Cancel sweep frequency.""" packet = bytearray(self._request_header) - packet.append(0x1e) - response = self.send_packet(0x6a, packet) + packet.append(0x1E) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def check_frequency(self) -> bool: """Return True if the frequency was identified successfully.""" packet = bytearray(self._request_header) - packet.append(0x1a) - response = self.send_packet(0x6a, packet) + packet.append(0x1A) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: @@ -65,8 +65,8 @@ def check_frequency(self) -> bool: def find_rf_packet(self) -> bool: """Enter radiofrequency learning mode.""" packet = bytearray(self._request_header) - packet.append(0x1b) - response = self.send_packet(0x6a, packet) + packet.append(0x1B) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) if payload[len(self._request_header) + 4] == 1: @@ -77,10 +77,10 @@ def _check_sensors(self, command: int) -> bytes: """Return the state of the sensors in raw format.""" packet = bytearray(self._request_header) packet.append(command) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return bytearray(payload[len(self._request_header) + 4:]) + return bytearray(payload[len(self._request_header) + 4 :]) def check_temperature(self) -> int: """Return the temperature.""" @@ -90,7 +90,7 @@ def check_temperature(self) -> int: def check_sensors(self) -> dict: """Return the state of the sensors.""" data = self._check_sensors(0x1) - return {'temperature': data[0x0] + data[0x1] / 10.0} + return {"temperature": data[0x0] + data[0x1] / 10.0} class rm4(rm): @@ -100,8 +100,8 @@ def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" device.__init__(self, *args, **kwargs) self.type = "RM4" - self._request_header = b'\x04\x00' - self._code_sending_header = b'\xda\x00' + self._request_header = b"\x04\x00" + self._code_sending_header = b"\xda\x00" def check_temperature(self) -> int: """Return the temperature.""" @@ -117,6 +117,6 @@ def check_sensors(self) -> dict: """Return the state of the sensors.""" data = self._check_sensors(0x24) return { - 'temperature': data[0x0] + data[0x1] / 100.0, - 'humidity': data[0x2] + data[0x3] / 100.0 + "temperature": data[0x0] + data[0x1] / 100.0, + "humidity": data[0x2] + data[0x3] / 100.0, } diff --git a/broadlink/sensor.py b/broadlink/sensor.py index 63c23b4b..ef7f6f12 100644 --- a/broadlink/sensor.py +++ b/broadlink/sensor.py @@ -7,9 +7,9 @@ class a1(device): """Controls a Broadlink A1.""" _SENSORS_AND_LEVELS = ( - ('light', ('dark', 'dim', 'normal', 'bright')), - ('air_quality', ('excellent', 'good', 'normal', 'bad')), - ('noise', ('quiet', 'normal', 'noisy')), + ("light", ("dark", "dim", "normal", "bright")), + ("air_quality", ("excellent", "good", "normal", "bad")), + ("noise", ("quiet", "normal", "noisy")), ) def __init__(self, *args, **kwargs) -> None: @@ -24,20 +24,20 @@ def check_sensors(self) -> dict: try: data[sensor] = levels[data[sensor]] except IndexError: - data[sensor] = 'unknown' + data[sensor] = "unknown" return data def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) data = bytearray(payload[0x4:]) return { - 'temperature': data[0x0] + data[0x1] / 10.0, - 'humidity': data[0x2] + data[0x3] / 10.0, - 'light': data[0x4], - 'air_quality': data[0x6], - 'noise': data[0x8], + "temperature": data[0x0] + data[0x1] / 10.0, + "humidity": data[0x2] + data[0x3] / 10.0, + "light": data[0x4], + "air_quality": data[0x6], + "noise": data[0x8], } diff --git a/broadlink/switch.py b/broadlink/switch.py index efea03d7..9b3cf821 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -17,19 +17,19 @@ def __init__(self, *args, **kwargs) -> None: def set_power_mask(self, sid_mask: int, state: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) - packet[0x00] = 0x0d - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xb2 + ((sid_mask << 1) if state else sid_mask) - packet[0x07] = 0xc0 + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if state else sid_mask) + packet[0x07] = 0xC0 packet[0x08] = 0x02 - packet[0x0a] = 0x03 - packet[0x0d] = sid_mask - packet[0x0e] = sid_mask if state else 0 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def set_power(self, sid: int, state: bool) -> None: @@ -40,30 +40,30 @@ def set_power(self, sid: int, state: bool) -> None: def check_power_raw(self) -> bool: """Return the power state of the device in raw format.""" packet = bytearray(16) - packet[0x00] = 0x0a - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xae - packet[0x07] = 0xc0 + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 packet[0x08] = 0x01 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return payload[0x0e] + return payload[0x0E] def check_power(self) -> dict: """Return the power state of the device.""" state = self.check_power_raw() if state is None: - return {'s1': None, 's2': None, 's3': None, 's4': None} + return {"s1": None, "s2": None, "s3": None, "s4": None} data = {} - data['s1'] = bool(state & 0x01) - data['s2'] = bool(state & 0x02) - data['s3'] = bool(state & 0x04) - data['s4'] = bool(state & 0x08) + data["s1"] = bool(state & 0x01) + data["s2"] = bool(state & 0x02) + data["s3"] = bool(state & 0x04) + data["s4"] = bool(state & 0x08) return data @@ -80,8 +80,8 @@ def get_state(self) -> dict: Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` """ - packet = self._encode(1, b'{}') - response = self.send_packet(0x6a, packet) + packet = self._encode(1, b"{}") + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) return self._decode(response) @@ -98,22 +98,22 @@ def set_state( """Set the power state of the device.""" data = {} if pwr is not None: - data['pwr'] = int(bool(pwr)) + data["pwr"] = int(bool(pwr)) if pwr1 is not None: - data['pwr1'] = int(bool(pwr1)) + data["pwr1"] = int(bool(pwr1)) if pwr2 is not None: - data['pwr2'] = int(bool(pwr2)) + data["pwr2"] = int(bool(pwr2)) if maxworktime is not None: - data['maxworktime'] = maxworktime + data["maxworktime"] = maxworktime if maxworktime1 is not None: - data['maxworktime1'] = maxworktime1 + data["maxworktime1"] = maxworktime1 if maxworktime2 is not None: - data['maxworktime2'] = maxworktime2 + data["maxworktime2"] = maxworktime2 if idcbrightness is not None: - data['idcbrightness'] = idcbrightness - js = json.dumps(data).encode('utf8') + data["idcbrightness"] = idcbrightness + js = json.dumps(data).encode("utf8") packet = self._encode(2, js) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) return self._decode(response) @@ -129,20 +129,22 @@ def _encode(self, flag: int, js: str) -> bytes: # 0x0e- json data packet = bytearray(14) length = 4 + 2 + 2 + 4 + len(js) - struct.pack_into('> 8 return packet def _decode(self, response: bytes) -> dict: """Decode a message.""" payload = self.decrypt(response[0x38:]) - js_len = struct.unpack_from(' None: packet[4] = 3 if state else 2 else: packet[4] = 1 if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def set_nightlight(self, state: bool) -> None: @@ -189,14 +191,14 @@ def set_nightlight(self, state: bool) -> None: packet[4] = 3 if state else 1 else: packet[4] = 2 if state else 0 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) def check_power(self) -> bool: """Return the power state of the device.""" packet = bytearray(16) packet[0] = 1 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) @@ -205,7 +207,7 @@ def check_nightlight(self) -> bool: """Return the state of the night light.""" packet = bytearray(16) packet[0] = 1 - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) @@ -213,14 +215,14 @@ def check_nightlight(self) -> bool: def get_energy(self) -> int: """Return the energy state of the device.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) - response = self.send_packet(0x6a, packet) + response = self.send_packet(0x6A, packet) check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:]) / 100.0 + return int((payload[0x07] + payload[0x06] / 100) * 100) + payload[0x05] / 100 class sp4(device): - """Controls a Broadlink SP4.""" + """Controls a Broadlink SP4.""" def __init__(self, *args, **kwargs) -> None: """Initialize the controller.""" @@ -314,7 +316,18 @@ def _encode(self, flag: int, state: dict) -> bytes: payload = json.dumps(state, separators=(",", ":")).encode() packet = bytearray(14) length = 4 + 2 + 2 + 4 + len(payload) - struct.pack_into('=2.1.1'], - description='Python API for controlling Broadlink IR controllers', + install_requires=["cryptography>=2.1.1"], + description="Python API for controlling Broadlink IR controllers", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", ], include_package_data=True, zip_safe=False,