Show COVID-19 exposure notification statistics
Based on !390 (merged)
Displays how many unique MACs (aka phones) which send exposure notifications have been seen during the last 100 seconds.
It tries to distinguish between Apple and Google devices based on a heuristic mentioned in https://github.com/znuh/microbit-corona-scanner. The user can choose if they want the LEDs and/or the vibration motor to show newly discovered MACs / exposure notifications.
Loosely based on:
- https://github.com/kauzu/CoronaAppDetector
- https://github.com/frankrieger/CoronaAppDetectorM5StickC
- https://github.com/znuh/microbit-corona-scanner
Most of the logic resides in a plain Python script and can be modified by the user as needed.
Merge request reports
Activity
added 19 commits
- 43d40861 - feat(interrupts): Function to check if an int is enabled
- eaf47ff4 - refact(ble): Move numeric comparison into our code base
- 85627336 - chore(ble): Remove unused cordio UI code
- bace0f82 - feat(ble): pairing dialog in bluetooth app
- 93f50e78 - fix(ble): Update advertising data even when it is empty
- 8458ce6e - fix(ble): Remove wrong adv timing entry
- 25626ba3 - fix(ble): Correctly switch between ADV modes
- 1c4e9678 - feat(ble): Add pretty event printing
- b2d78fea - fix(config): Fix wrong debug print
- 7e8f17f0 - feat(ble): Better bonding UI
- e86c7b6e - refact(ble): Simplify bonding storage
- 3d5789af - fix(ble): Keep advertising in slected mode after connection closes
- 0f23df0a - fix(ble): Go back to main.py after bonding
- 8b226993 - feat(ble): Only be discoverble in the BLE dialog
- 7ea488c6 - feat(ble): Appear as generic watch, add card10 UUID to advertisement
- 6b4a975e - feat(ble): epicordium support for retrieving scan reports
- 8a390db4 - feat(ble): pycardium support for scanning
- faadebd8 - fix(ble): Dont report missed scan events
- 9f708cda - feat(ble): Add exposure notification scanner app
Toggle commit listadded 15 commits
- 7c16c31f - initial try blink_rocket
- de289cfd - changed blink_timer[] init
- c9de9e71 - bug: blink also on first invocation
- 539c7fb8 - documentation added
- 0f67e1d3 - removed #define
- 039066d8 - for annoyatron
- cb11deac - Apply suggestion to pycardium/modules/sys_leds.c
- 2df72a3b - rocket_timer_callback
- beafdeea - Merge branch 'blink_multi_rocket' of...
- 715aac63 - Merge branch 'master' into blink_multi_rocket
- e14ba12a - use static timer for blink rockets
- 9d9a1c57 - lint: interchanged spaces with tabs
- 3527b3e9 - rename: blink→flash
- e15ee7a7 - Merge branch 'zenox/firmware-blink_multi_rocket' into schneider/covid-tracing
- 3df5b338 - feat(ble): More options for exp. not. counter
Toggle commit listadded 1 commit
- 16a372f0 - feat(ble): More options for exp. not. counter
added 1 commit
- 29ab124a - feat(ble): More options for exp. not. counter
added 1 commit
- f06865ce - feat(ble): More options for exp. not. counter
added 1 commit
- 560b550b - chore(preload): Rename Exposure Notification stats app folder name
added 1 commit
- e47710f6 - doc(preload): Update Exposure Notification app meta data
I'm currently in the process of implementing saving of exposure notifications to flash. I don't follow the exact procedure that Google has (no 4s accumulation but accumulation for each mac and save count, first and last contact for each mac and only write to flash if we haven't seen this mac for a set amount of time). Once I have my proof of concept working I will post a patch here (or is there a good way to do a PR for a PR?)
The following patch saves all Data that is required to create an exposure risk statistic afterwards on a device with internet connection. Thresholds/Timeouts are probably not ideal for real world use yet and one important but missing feature is to save all data in seen (even if it hasn't been pruned yet) to file on exit of the app.
diff --git a/preload/apps/exnostat/__init__.py b/preload/apps/exnostat/__init__.py index 5ad9afc1..d56c9148 100644 --- a/preload/apps/exnostat/__init__.py +++ b/preload/apps/exnostat/__init__.py @@ -7,12 +7,16 @@ import color import buttons import leds import config +import struct DM_ADV_TYPE_FLAGS = 0x01 DM_ADV_TYPE_16_UUID = 0x03 DM_ADV_TYPE_SERVICE_DATA = 0x16 UUID = b"\x6f\xfd" -TIMEOUT = 100 +TIMEOUT = 120 +WRITE_TIMEOUT = 30 +WRITE_THRESHOLD = 10 +FILENAME = "exno.bin" MODE_OFF = 0 MODE_ON_NEW_MAC = 1 @@ -20,6 +24,8 @@ MODE_ON_RX = 2 MODE_BOTH = 3 seen = {} +seen_towrite = {} +last_towrite_add = 0 vib_mode = MODE_BOTH led_mode = MODE_BOTH @@ -61,24 +67,63 @@ def process_covid_data(mac, service_data, rssi, flags): if led_mode in [MODE_ON_NEW_MAC, MODE_BOTH] and mac not in seen: leds.flash_rocket(1, 31, 200) - print(bytes2hex(mac, ":"), rssi, bytes2hex(service_data), flags) # try to produce a small int last_rx_time = time.time() - t0 - seen[mac] = [int(last_rx_time), flags] + if mac in seen: + print(bytes2hex(mac, ":"), rssi, bytes2hex(service_data), flags, seen[mac][1]+1) + seen[mac][0] = int(last_rx_time) + seen[mac][1] += 1 # increase counter + seen[mac][2][0] = max(seen[mac][2][0], rssi) + seen[mac][2][1] = (seen[mac][2][1] + rssi)/2 + seen[mac][2][2] = min(seen[mac][2][2], rssi) + seen[mac][3][1] = time.unix_time() + else: + print(bytes2hex(mac, ":"), rssi, bytes2hex(service_data), flags, "1") + # The elements are + # - last rx time + # - seen count + # - rssi + # - max + # - avg + # - min + # - timestamps + # - first seen + # - last seen + # - flags + # - service data + seen[mac] = [int(last_rx_time), 1, [rssi, rssi, rssi], [time.unix_time(), time.unix_time()], flags, service_data] + + +def write_to_file(data): + print("Writing", len(data), "seen MACs into flash...") + with open(FILENAME, "ab") as outfile: + for mac in data: + outfile.write(struct.pack("<6sHiiiQQ20s", mac, data[mac][1], int(data[mac][2][0]*100), int(data[mac][2][1]*100), int(data[mac][2][2]*100), data[mac][3][0], data[mac][3][1], data[mac][5])) + print("Write finished") def prune(): global seen + global seen_towrite + global last_towrite_add seen_pruned = {} now = time.time() - t0 for mac in seen: if seen[mac][0] + TIMEOUT > now: seen_pruned[mac] = seen[mac] + else: + print("Add MAC", mac, "to write queue") + seen_towrite[mac] = seen[mac] + last_towrite_add = now seen = seen_pruned + if len(seen_towrite) >= WRITE_THRESHOLD or (last_towrite_add + WRITE_TIMEOUT < now and len(seen_towrite) > 0): + write_to_file(seen_towrite) + seen_towrite = {} + def process_scan_report(scan_report): ads = parse_advertisement_data(scan_report[0]) @@ -123,7 +168,7 @@ def show_stats(): seen_copy = seen.copy() for mac in seen_copy: info = seen_copy[mac] - if info[1]: + if info[4]: seen_apple += 1 else: seen_google += 1 @@ -224,5 +269,6 @@ while True: if pause == 0: show_stats() pause = 10 + prune() time.sleep(0.1)
To parse the written date on a PC, you can use struct.unpack() like in the example below
import struct file = open("exno.bin", "rb") data = file.read() for i in range(0,int(len(data)/56)): struct.unpack("<6sHiiiQQ20s", data[i*56:(i+1)*56]) file.close()
Is there anything I overlooked that is needed for the data to be useful?
and a simple addition to toggle the display backlight:
if v & buttons.BOTTOM_RIGHT: if bl: bl = False disp.backlight(0) else: bl = True disp.backlight(100)
and declare the variable
bl = True
above the while loopUpdate: after 7 hours of continuous use (with backlight off most of the time, LED blink on for every RX and MAC and vibration off), the Battery only dropped to 3.9V (so about 80-85% left)
Edited by Jakob