Skip to content
Snippets Groups Projects
Commit 48a00c8c authored by schneider's avatar schneider
Browse files

feat(ble): Add exposure notification scanner app

parent 1e9e07a9
No related branches found
No related tags found
1 merge request!392Show COVID-19 exposure notification statistics
import interrupt
import sys_ble
import time
import vibra
import display
import color
DM_ADV_TYPE_FLAGS = 0x01
DM_ADV_TYPE_16_UUID = 0x03
DM_ADV_TYPE_SERVICE_DATA = 0x16
UUID = b"\x6f\xfd"
TIMEOUT = 100
seen = {}
def parse_advertisement_data(data):
ads = {}
l = len(data)
p = 0
while p < l:
ad_len = data[p]
p += 1
if ad_len > 0:
ad_type = data[p]
ad_data = b""
p += 1
if ad_len > 1:
ad_data = data[p : p + ad_len - 1]
p += ad_len - 1
ads[ad_type] = ad_data
return ads
def bytes2hex(bin, sep=""):
return sep.join(["%02x" % x for x in bin])
def process_covid_dada(mac, service_data, rssi, flags):
vibra.vibrate(10)
print(bytes2hex(mac, ":"), rssi, bytes2hex(service_data), flags)
# try to produce a small int
seen[mac] = [int(time.time() - t0), flags]
def prune():
global seen
seen_pruned = {}
now = time.time() - t0
for mac in seen:
if seen[mac][0] + TIMEOUT > now:
seen_pruned[mac] = seen[mac]
seen = seen_pruned
def process_scan_report(scan_report):
ads = parse_advertisement_data(scan_report[0])
mac = scan_report[4]
mac = bytes([mac[5], mac[4], mac[3], mac[2], mac[1], mac[0]])
rssi = scan_report[1]
# print(bytes2hex(mac, ':'), rssi, bytes2hex(scan_report[0]), ads)
# According to spec there is no other service announced and the
# service is always listed in the complete list of 16 bit services
if DM_ADV_TYPE_16_UUID in ads:
if ads[DM_ADV_TYPE_16_UUID] == UUID:
if DM_ADV_TYPE_SERVICE_DATA in ads:
flags = None
if DM_ADV_TYPE_FLAGS in ads:
flags = ads[DM_ADV_TYPE_FLAGS]
# service data contains another copy of the service UUID
process_covid_dada(mac, ads[DM_ADV_TYPE_SERVICE_DATA][2:], rssi, flags)
def ble_callback(_):
event = sys_ble.get_event()
prune()
while True:
scan_report = sys_ble.get_scan_report()
if scan_report == None:
return
process_scan_report(scan_report)
t0 = time.time()
disp = display.open()
interrupt.set_callback(interrupt.BLE, ble_callback)
interrupt.enable_callback(interrupt.BLE)
sys_ble.scan_start()
while True:
seen_google = 0
seen_apple = 0
t = time.time() - t0
t_min = t
for mac in seen:
if seen[mac][1]:
seen_apple += 1
else:
seen_google += 1
if seen[mac][0] < t_min:
t_min = seen[mac][0]
seen_total = seen_google + seen_apple
window = t - t_min
disp.clear()
disp.print("Last %u s:" % window, posy=0, fg=color.WHITE)
disp.print("Google: %d" % seen_google, posy=20, fg=color.GREEN)
disp.print("Apple: %d" % seen_apple, posy=40, fg=color.BLUE)
disp.print("Total: %d" % seen_total, posy=60, fg=color.WHITE)
disp.update()
# print(seen)
time.sleep(1)
{"author": "schneider", "name": "Exposure Notification Counter", "description": "Count exposure notifications received via BLE", "category": "Hardware", "revision": 1, "source":"preload"}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment