Skip to content

Commit

Permalink
Merge pull request #270 from brilliantlabsAR/upload_file_class
Browse files Browse the repository at this point in the history
tools/upload_file.py: turn into a class
  • Loading branch information
siliconwitch authored Oct 9, 2023
2 parents 2b0ea1f + a147786 commit e9ab02d
Showing 1 changed file with 86 additions and 88 deletions.
174 changes: 86 additions & 88 deletions tools/upload_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,108 +13,106 @@
from bleak.uuids import register_uuids


UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"

DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d"
DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d"
DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d"

register_uuids({
DATA_SERVICE_UUID: "Monocle Raw Serivce",
DATA_TX_CHAR_UUID: "Monocle Raw RX",
DATA_RX_CHAR_UUID: "Monocle Raw TX",
})

repl_rx_event = asyncio.Event()
repl_rx_last = None
data_rx_event = asyncio.Event()
repl_rx_last = None

async def upload_file(file):

def match_repl_uuid(device: BLEDevice, adv: AdvertisementData):
class MonocleScript:
UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"

DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d"
DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d"
DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d"

def __init__(self):
register_uuids({
self.DATA_SERVICE_UUID: "Monocle Data Serivce",
self.DATA_TX_CHAR_UUID: "Monocle Data RX",
self.DATA_RX_CHAR_UUID: "Monocle Data TX",
})

@classmethod
async def run(cls, *args):
self = cls()

device = await BleakScanner.find_device_by_filter(self.match_uart_uuid)
if device is None:
print("no matching device found\n")
exit(1)
print("Connected to the Monocle")

async with BleakClient(device, disconnected_callback=self.handle_disconnect) as c:
self.client = c
await self.init_uart_service()
await self.init_data_service()
await self.set_monocle_raw_mode()
await self.script(*args)
await self.client.write_gatt_char(self.uart_rx_char, b"\x02")

def match_uart_uuid(self, device:BLEDevice, adv:AdvertisementData):
print(f"uuids={adv.service_uuids}")
return UART_SERVICE_UUID.lower() in adv.service_uuids
return self.UART_SERVICE_UUID.lower() in adv.service_uuids

def handle_disconnect(_: BleakClient):
def handle_disconnect(self, _:BleakClient):
print("\nDevice was disconnected.")
for task in asyncio.all_tasks():
task.cancel()
exit(1)

def handle_repl_tx(_: BleakGATTCharacteristic, data: bytearray):
def handle_uart_tx(self, _:BleakGATTCharacteristic, data:bytearray):
# Here, handle data sent by the Monocle with `print()`
global repl_rx_last
repl_rx_last = data
repl_rx_event.set()
self.uart_rx_last = data
self.uart_rx_event.set()

def handle_data_tx(_: BleakGATTCharacteristic, data: bytearray):
def handle_data_tx(self, _:BleakGATTCharacteristic, data:bytearray):
# Here, handle data sent by the Monocle with `bluetooth.send()`
global data_rx_last
data_rx_last = data
data_rx_event.set()

device = await BleakScanner.find_device_by_filter(match_repl_uuid)
if device is None:
print("no matching device found\n")
exit(1)

async def send_command(client, cmd):
global repl_rx_event
global repl_rx_last

await client.write_gatt_char(repl_rx_char, cmd)
await repl_rx_event.wait()
repl_rx_event.clear()
print(repl_rx_last)
assert repl_rx_last[0:2] == b'OK'

print("Connected to the Monocle")

async with BleakClient(device, disconnected_callback=handle_disconnect) as client:
await client.start_notify(UART_TX_CHAR_UUID, handle_repl_tx)
await client.start_notify(DATA_TX_CHAR_UUID, handle_data_tx)

repl_service = client.services.get_service(UART_SERVICE_UUID)
data_service = client.services.get_service(DATA_SERVICE_UUID)
repl_rx_char = repl_service.get_characteristic(UART_RX_CHAR_UUID)
data_rx_char = data_service.get_characteristic(DATA_RX_CHAR_UUID)

# Example application: upload a file to the Monocle
loop = asyncio.get_running_loop()

global repl_rx_event
global repl_rx_last

print(">>> switching the terminal to raw mode")
await client.write_gatt_char(repl_rx_char, b'\x01')
await repl_rx_event.wait()
await asyncio.sleep(1)
repl_rx_event.clear()
print(repl_rx_last)
self.data_rx_last = data
self.data_rx_event.set()

async def send_command(self, cmd):
await self.client.write_gatt_char(self.uart_rx_char, cmd.encode('ascii') + b'\x04')
await self.uart_rx_event.wait()
self.uart_rx_event.clear()
print(self.uart_rx_last)
assert self.uart_rx_last[0:2] == b'OK'

async def init_uart_service(self):
await self.client.start_notify(self.UART_TX_CHAR_UUID, self.handle_uart_tx)
uart_service = self.client.services.get_service(self.UART_SERVICE_UUID)
self.uart_rx_char = uart_service.get_characteristic(self.UART_RX_CHAR_UUID)
self.uart_rx_event = asyncio.Event()
self.uart_rx_last = None

async def init_data_service(self):
await self.client.start_notify(self.DATA_TX_CHAR_UUID, self.handle_data_tx)
data_service = self.client.services.get_service(self.DATA_SERVICE_UUID)
self.data_rx_char = data_service.get_characteristic(self.DATA_RX_CHAR_UUID)
self.data_rx_event = asyncio.Event()
self.data_rx_last = None

async def set_monocle_raw_mode(self):
await self.client.write_gatt_char(self.uart_rx_char, b'\x01')
await self.uart_rx_event.wait()
await asyncio.sleep(0.5)
self.uart_rx_event.clear()


class UploadFileScript(MonocleScript):
"""
Example application: upload a file to the Monocle
"""
async def script(self, file):

print(">>> opening a file to write to")
await send_command(client, ''f"f = open('{file}', 'wb')\x04".encode("ascii"))
await self.send_command(f"f = open('{file}', 'wb')")

print(">>> writing the data to the file")
with open(file, 'rb') as f:
size = repl_rx_char.max_write_without_response_size - len("f.write(r'''''')\x04")
while data := f.read(size):
sep = b"'''" if b'"""' in data else b'"""'
await send_command(client, b"f.write(r" + sep + data + sep + b")\x04")
while data := f.read(100):
print(data)
await self.send_command(f"f.write({bytes(data).__repr__()})")

print(">>> closing the file")
await send_command(client, b"f.close()\x04")

print(">>> switch back to friendly mode")
await client.write_gatt_char(repl_rx_char, b"\x02")
await self.send_command("f.close()")

print(">>> script done")

print(">>> upload done!")

if __name__ == "__main__":
file = sys.argv[1]
try:
asyncio.run(upload_file(file))
except asyncio.CancelledError:
pass
asyncio.run(UploadFileScript.run(sys.argv[1]))

0 comments on commit e9ab02d

Please sign in to comment.