Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/upload_file.py: implement getline() #271

Merged
merged 2 commits into from
Oct 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 62 additions & 37 deletions tools/upload_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import asyncio
import sys
import os

from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
Expand All @@ -29,6 +30,7 @@ def __init__(self):
self.DATA_RX_CHAR_UUID: "Monocle Data TX",
})


@classmethod
async def run(cls, *args):
self = cls()
Expand All @@ -37,7 +39,6 @@ async def run(cls, *args):
if device is None:
print("no matching device found\n")
exit(1)
print("Connected to the Monocle")

async with BleakClient(device, disconnected_callback=self.handle_disconnect) as c:
self.client = c
Expand All @@ -47,72 +48,96 @@ async def run(cls, *args):
await self.script(*args)
await self.client.write_gatt_char(self.uart_rx_char, b"\x02")

def log(self, msg):
if "DEBUG" in os.environ:
print(msg, flush=True)

def match_uart_uuid(self, device:BLEDevice, adv:AdvertisementData):
print(f"uuids={adv.service_uuids}")
self.log(f"uuids={adv.service_uuids}")
return self.UART_SERVICE_UUID.lower() in adv.service_uuids

def handle_disconnect(self, _:BleakClient):
print("\nDevice was disconnected.")
exit(1)
self.log("Device was disconnected.")
for task in asyncio.all_tasks():
task.cancel()

def handle_uart_tx(self, _:BleakGATTCharacteristic, data:bytearray):
def handle_uart_rx(self, _:BleakGATTCharacteristic, data:bytearray):
# Here, handle data sent by the Monocle with `print()`
self.uart_rx_last = data
self.uart_rx_event.set()
self.uart_rx_buf.extend(data)

def handle_data_tx(self, _:BleakGATTCharacteristic, data:bytearray):
def handle_data_rx(self, _:BleakGATTCharacteristic, data:bytearray):
# Here, handle data sent by the Monocle with `bluetooth.send()`
self.data_rx_last = data
self.data_rx_event.set()
self.data_rx_buf.extend(data)

async def getchar_uart(self):
while len(self.uart_rx_buf) == 0:
await asyncio.sleep(0.01)
c = self.uart_rx_buf[0]
del self.uart_rx_buf[0]
return c

async def getchar_data(self):
while len(self.data_rx_buf) == 0:
await asyncio.sleep(0.01)
c = self.data_rx_buf[0]
del self.data_rx_buf[0]
return c

async def getline_uart(self, delim=b"\n"):
buf = bytearray()
while not (c := await self.getchar_uart()) in delim:
buf.append(c)
return buf

async def getline_data(self, delim="\n"):
buf = bytearray()
while not (c := await self.getchar_uart()) in delim:
buf.append(c)
return buf

async def send_command(self, cmd):
await self.client.write_gatt_char(self.uart_rx_char, cmd.encode('ascii') + b'\x04')
await self.uart_rx_event.wait()
self.uart_rx_event.clear()
print(self.uart_rx_last)
assert self.uart_rx_last[0:2] == b'OK'
await self.client.write_gatt_char(self.uart_rx_char, cmd.encode("ascii") + b"\x04")
while True:
resp = await self.getline_uart(delim=b"\r\n\x04")
if resp != b"" and resp != b">":
break
assert resp == b">OK"

async def init_uart_service(self):
await self.client.start_notify(self.UART_TX_CHAR_UUID, self.handle_uart_tx)
await self.client.start_notify(self.UART_TX_CHAR_UUID, self.handle_uart_rx)
uart_service = self.client.services.get_service(self.UART_SERVICE_UUID)
self.uart_rx_char = uart_service.get_characteristic(self.UART_RX_CHAR_UUID)
self.uart_rx_event = asyncio.Event()
self.uart_rx_last = None
self.uart_rx_buf = bytearray()

async def init_data_service(self):
await self.client.start_notify(self.DATA_TX_CHAR_UUID, self.handle_data_tx)
await self.client.start_notify(self.DATA_TX_CHAR_UUID, self.handle_data_rx)
data_service = self.client.services.get_service(self.DATA_SERVICE_UUID)
self.data_rx_char = data_service.get_characteristic(self.DATA_RX_CHAR_UUID)
self.data_rx_event = asyncio.Event()
self.data_rx_last = None

async def set_monocle_raw_mode(self):
await self.client.write_gatt_char(self.uart_rx_char, b'\x01')
await self.uart_rx_event.wait()
await asyncio.sleep(0.5)
self.uart_rx_event.clear()
await self.client.write_gatt_char(self.uart_rx_char, b"\x01 \x04")
while await self.getline_uart(delim=b"\r\n\x04") != b">OK":
pass


class UploadFileScript(MonocleScript):
"""
Example application: upload a file to the Monocle
"""
async def script(self, file):

print(">>> opening a file to write to")
print(f"uploading {file} ", end="")
await self.send_command(f"f = open('{file}', 'wb')")

print(">>> writing the data to the file")
with open(file, 'rb') as f:
with open(file, "rb") as f:
while data := f.read(100):
print(data)
print(end=".", flush=True)
await self.send_command(f"f.write({bytes(data).__repr__()})")

print(">>> closing the file")
await self.send_command("f.close()")

print(">>> script done")

print(" done")

if __name__ == "__main__":
asyncio.run(UploadFileScript.run(sys.argv[1]))
for file in sys.argv[1:]:
try:
asyncio.run(UploadFileScript.run(file))
except asyncio.exceptions.CancelledError:
pass