From feb42e67e945b98c79a4c523bca8827d2197a08e Mon Sep 17 00:00:00 2001 From: schneider <schneider@blinkenlichts.net> Date: Sun, 19 Apr 2020 23:14:53 +0200 Subject: [PATCH] feat(tools): Tool to upload a file via BLE --- tools/card10-ble-transfer.py | 132 +++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100755 tools/card10-ble-transfer.py diff --git a/tools/card10-ble-transfer.py b/tools/card10-ble-transfer.py new file mode 100755 index 000000000..3a9031130 --- /dev/null +++ b/tools/card10-ble-transfer.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 + +import bluepy +import time +import binascii +import struct +import sys +import argparse +import tqdm +import os + + +rx_done = False +rx_data = None + + +def main() -> None: + global rx_done, rx_data + parser = argparse.ArgumentParser( + description="""\ +Transfer a file to a card10 using Bluetooth Low Energy. +""" + ) + + parser.add_argument( + "-d", + "--directory", + help="Target directory on the card10. Root directory by default", + ) + parser.add_argument("-s", "--silent", help="Don't print status information") + parser.add_argument( + "-m", "--mtu", type=int, default=96, help="MTU to use. Default 96 bytes" + ) + parser.add_argument( + "mac", help="BT MAC address of the card10. Format: CA:4D:10:XX:XX:XX" + ) + parser.add_argument("file", help="file to transfer") + + args = parser.parse_args() + + t0 = time.time() + p = bluepy.btle.Peripheral(args.mac) + + mtu = args.mtu + p.setMTU(mtu) + chunk_size = mtu - 8 + + s = p.getServiceByUUID("42230100-2342-2342-2342-234223422342") + tx = s.getCharacteristics("42230101-2342-2342-2342-234223422342")[0] + rx = s.getCharacteristics("42230102-2342-2342-2342-234223422342")[0] + + print("Connection setup time:", int(time.time() - t0), "seconds") + + class MyDelegate(bluepy.btle.DefaultDelegate): + def __init__(self): + bluepy.btle.DefaultDelegate.__init__(self) + + def handleNotification(self, cHandle, data): + global rx_done, rx_data + rx_data = data + rx_done = True + + def check_crc(data, crc): + our_crc = binascii.crc32(data) + return crc == struct.pack(">I", our_crc) + + p.setDelegate(MyDelegate()) + + if args.directory is not None: + filename = args.directory + "/" + args.file + else: + filename = args.file + print("Target filename:", filename) + + file_size = os.path.getsize(args.file) + t = tqdm.tqdm(total=file_size, unit="bytes") + + start = b"s" + bytes(filename, "ASCII") + tx.write(start) + p.waitForNotifications(10.0) + if not rx_done: + print("No reply") + sys.exit(1) + + if not (rx_data[0:1] == b"S" and check_crc(start, rx_data[1:])): + print("Filename not acknowledged") + return + + with open(args.file) as f: + offset = 0 + # t0 = time.time() + while True: + payload = bytes(f.read(chunk_size), "UTF-8") + if len(payload) == 0: + break + + chunk = b"c" + struct.pack(">I", offset) + payload + # print((int)((time.time() - t0) * 1000)) + # t0 = time.time() + tx.write(chunk) + p.waitForNotifications(10.0) + if not rx_done: + print("No reply") + # TODO: Handle retries + sys.exit(1) + + if not (rx_data[0:1] == b"C" and check_crc(chunk, rx_data[1:])): + print("Chunk not acknowledged") + break + + t.update(len(payload)) + offset += len(payload) + + if len(payload) < chunk_size: + break + + finish = b"f" + tx.write(finish) + p.waitForNotifications(10.0) + if not rx_done: + print("No reply") + sys.exit(1) + + if not rx_data[0:1] == b"F": + print("Finish not acknowledged") + + t.update(0) + t.close() + + +if __name__ == "__main__": + main() -- GitLab