Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: add upload_file.py that uploads a file under the same path #268

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions tools/upload_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""
An example showing how to connect to the Monocle and store a file.
"""

import asyncio
import sys

from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak.uuids import register_uuids


UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"

DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d"
DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d"
DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d"

register_uuids({
DATA_SERVICE_UUID: "Monocle Raw Serivce",
DATA_TX_CHAR_UUID: "Monocle Raw RX",
DATA_RX_CHAR_UUID: "Monocle Raw TX",
})

repl_rx_event = asyncio.Event()
repl_rx_last = None
data_rx_event = asyncio.Event()
repl_rx_last = None

async def upload_file(file):

def match_repl_uuid(device: BLEDevice, adv: AdvertisementData):
print(f"uuids={adv.service_uuids}")
return UART_SERVICE_UUID.lower() in adv.service_uuids

def handle_disconnect(_: BleakClient):
print("\nDevice was disconnected.")
for task in asyncio.all_tasks():
task.cancel()

def handle_repl_tx(_: BleakGATTCharacteristic, data: bytearray):
# Here, handle data sent by the Monocle with `print()`
global repl_rx_last
repl_rx_last = data
repl_rx_event.set()

def handle_data_tx(_: BleakGATTCharacteristic, data: bytearray):
# Here, handle data sent by the Monocle with `bluetooth.send()`
global data_rx_last
data_rx_last = data
data_rx_event.set()

device = await BleakScanner.find_device_by_filter(match_repl_uuid)
if device is None:
print("no matching device found\n")
exit(1)

async def send_command(client, cmd):
global repl_rx_event
global repl_rx_last

await client.write_gatt_char(repl_rx_char, cmd)
await repl_rx_event.wait()
repl_rx_event.clear()
print(repl_rx_last)
assert repl_rx_last[0:2] == b'OK'

print("Connected to the Monocle")

async with BleakClient(device, disconnected_callback=handle_disconnect) as client:
await client.start_notify(UART_TX_CHAR_UUID, handle_repl_tx)
await client.start_notify(DATA_TX_CHAR_UUID, handle_data_tx)

repl_service = client.services.get_service(UART_SERVICE_UUID)
data_service = client.services.get_service(DATA_SERVICE_UUID)
repl_rx_char = repl_service.get_characteristic(UART_RX_CHAR_UUID)
data_rx_char = data_service.get_characteristic(DATA_RX_CHAR_UUID)

# Example application: upload a file to the Monocle
loop = asyncio.get_running_loop()

global repl_rx_event
global repl_rx_last

print(">>> switching the terminal to raw mode")
await client.write_gatt_char(repl_rx_char, b'\x01')
await repl_rx_event.wait()
await asyncio.sleep(1)
repl_rx_event.clear()
print(repl_rx_last)

print(">>> opening a file to write to")
await send_command(client, ''f"f = open('{file}', 'wb')\x04".encode("ascii"))

print(">>> writing the data to the file")
with open(file, 'rb') as f:
size = repl_rx_char.max_write_without_response_size - len("f.write(r'''''')\x04")
while data := f.read(size):
sep = b"'''" if b'"""' in data else b'"""'
await send_command(client, b"f.write(r" + sep + data + sep + b")\x04")

print(">>> closing the file")
await send_command(client, b"f.close()\x04")

print(">>> switch back to friendly mode")
await client.write_gatt_char(repl_rx_char, b"\x02")

print(">>> upload done!")

if __name__ == "__main__":
file = sys.argv[1]
try:
asyncio.run(upload_file(file))
except asyncio.CancelledError:
pass