From a1477868a26ea94b007a6673059d56ca2adfdf2b Mon Sep 17 00:00:00 2001 From: Josuah Demangeon Date: Sun, 8 Oct 2023 10:17:29 -0400 Subject: [PATCH] tools/upload_file.py: turn into a class --- tools/upload_file.py | 174 +++++++++++++++++++++---------------------- 1 file changed, 86 insertions(+), 88 deletions(-) diff --git a/tools/upload_file.py b/tools/upload_file.py index bfc68560..361e3339 100644 --- a/tools/upload_file.py +++ b/tools/upload_file.py @@ -13,108 +13,106 @@ from bleak.uuids import register_uuids -UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" -UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" -UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" - -DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d" -DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d" -DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d" - -register_uuids({ - DATA_SERVICE_UUID: "Monocle Raw Serivce", - DATA_TX_CHAR_UUID: "Monocle Raw RX", - DATA_RX_CHAR_UUID: "Monocle Raw TX", -}) - -repl_rx_event = asyncio.Event() -repl_rx_last = None -data_rx_event = asyncio.Event() -repl_rx_last = None - -async def upload_file(file): - - def match_repl_uuid(device: BLEDevice, adv: AdvertisementData): +class MonocleScript: + UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e" + UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" + UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" + + DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d" + DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d" + DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d" + + def __init__(self): + register_uuids({ + self.DATA_SERVICE_UUID: "Monocle Data Serivce", + self.DATA_TX_CHAR_UUID: "Monocle Data RX", + self.DATA_RX_CHAR_UUID: "Monocle Data TX", + }) + + @classmethod + async def run(cls, *args): + self = cls() + + device = await BleakScanner.find_device_by_filter(self.match_uart_uuid) + if device is None: + print("no matching device found\n") + exit(1) + print("Connected to the Monocle") + + async with BleakClient(device, disconnected_callback=self.handle_disconnect) as c: + self.client = c + await self.init_uart_service() + await self.init_data_service() + await self.set_monocle_raw_mode() + await self.script(*args) + await self.client.write_gatt_char(self.uart_rx_char, b"\x02") + + def match_uart_uuid(self, device:BLEDevice, adv:AdvertisementData): print(f"uuids={adv.service_uuids}") - return UART_SERVICE_UUID.lower() in adv.service_uuids + return self.UART_SERVICE_UUID.lower() in adv.service_uuids - def handle_disconnect(_: BleakClient): + def handle_disconnect(self, _:BleakClient): print("\nDevice was disconnected.") - for task in asyncio.all_tasks(): - task.cancel() + exit(1) - def handle_repl_tx(_: BleakGATTCharacteristic, data: bytearray): + def handle_uart_tx(self, _:BleakGATTCharacteristic, data:bytearray): # Here, handle data sent by the Monocle with `print()` - global repl_rx_last - repl_rx_last = data - repl_rx_event.set() + self.uart_rx_last = data + self.uart_rx_event.set() - def handle_data_tx(_: BleakGATTCharacteristic, data: bytearray): + def handle_data_tx(self, _:BleakGATTCharacteristic, data:bytearray): # Here, handle data sent by the Monocle with `bluetooth.send()` - global data_rx_last - data_rx_last = data - data_rx_event.set() - - device = await BleakScanner.find_device_by_filter(match_repl_uuid) - if device is None: - print("no matching device found\n") - exit(1) - - async def send_command(client, cmd): - global repl_rx_event - global repl_rx_last - - await client.write_gatt_char(repl_rx_char, cmd) - await repl_rx_event.wait() - repl_rx_event.clear() - print(repl_rx_last) - assert repl_rx_last[0:2] == b'OK' - - print("Connected to the Monocle") - - async with BleakClient(device, disconnected_callback=handle_disconnect) as client: - await client.start_notify(UART_TX_CHAR_UUID, handle_repl_tx) - await client.start_notify(DATA_TX_CHAR_UUID, handle_data_tx) - - repl_service = client.services.get_service(UART_SERVICE_UUID) - data_service = client.services.get_service(DATA_SERVICE_UUID) - repl_rx_char = repl_service.get_characteristic(UART_RX_CHAR_UUID) - data_rx_char = data_service.get_characteristic(DATA_RX_CHAR_UUID) - - # Example application: upload a file to the Monocle - loop = asyncio.get_running_loop() - - global repl_rx_event - global repl_rx_last - - print(">>> switching the terminal to raw mode") - await client.write_gatt_char(repl_rx_char, b'\x01') - await repl_rx_event.wait() - await asyncio.sleep(1) - repl_rx_event.clear() - print(repl_rx_last) + self.data_rx_last = data + self.data_rx_event.set() + + async def send_command(self, cmd): + await self.client.write_gatt_char(self.uart_rx_char, cmd.encode('ascii') + b'\x04') + await self.uart_rx_event.wait() + self.uart_rx_event.clear() + print(self.uart_rx_last) + assert self.uart_rx_last[0:2] == b'OK' + + async def init_uart_service(self): + await self.client.start_notify(self.UART_TX_CHAR_UUID, self.handle_uart_tx) + uart_service = self.client.services.get_service(self.UART_SERVICE_UUID) + self.uart_rx_char = uart_service.get_characteristic(self.UART_RX_CHAR_UUID) + self.uart_rx_event = asyncio.Event() + self.uart_rx_last = None + + async def init_data_service(self): + await self.client.start_notify(self.DATA_TX_CHAR_UUID, self.handle_data_tx) + data_service = self.client.services.get_service(self.DATA_SERVICE_UUID) + self.data_rx_char = data_service.get_characteristic(self.DATA_RX_CHAR_UUID) + self.data_rx_event = asyncio.Event() + self.data_rx_last = None + + async def set_monocle_raw_mode(self): + await self.client.write_gatt_char(self.uart_rx_char, b'\x01') + await self.uart_rx_event.wait() + await asyncio.sleep(0.5) + self.uart_rx_event.clear() + + +class UploadFileScript(MonocleScript): + """ + Example application: upload a file to the Monocle + """ + async def script(self, file): print(">>> opening a file to write to") - await send_command(client, ''f"f = open('{file}', 'wb')\x04".encode("ascii")) + await self.send_command(f"f = open('{file}', 'wb')") print(">>> writing the data to the file") with open(file, 'rb') as f: - size = repl_rx_char.max_write_without_response_size - len("f.write(r'''''')\x04") - while data := f.read(size): - sep = b"'''" if b'"""' in data else b'"""' - await send_command(client, b"f.write(r" + sep + data + sep + b")\x04") + while data := f.read(100): + print(data) + await self.send_command(f"f.write({bytes(data).__repr__()})") print(">>> closing the file") - await send_command(client, b"f.close()\x04") - - print(">>> switch back to friendly mode") - await client.write_gatt_char(repl_rx_char, b"\x02") + await self.send_command("f.close()") + + print(">>> script done") - print(">>> upload done!") if __name__ == "__main__": - file = sys.argv[1] - try: - asyncio.run(upload_file(file)) - except asyncio.CancelledError: - pass + asyncio.run(UploadFileScript.run(sys.argv[1]))