Skip to content
Snippets Groups Projects
Verified Commit 58a15f38 authored by xiretza's avatar xiretza
Browse files

refactor(ecg-app): Use button_gestures module

parent ffdce10d
No related tags found
No related merge requests found
Pipeline #3918 failed
import button_gestures as bg
import os import os
import display import display
import leds import leds
...@@ -26,17 +27,23 @@ COLOR_WRITE_FG = [255, 255, 255] ...@@ -26,17 +27,23 @@ COLOR_WRITE_FG = [255, 255, 255]
COLOR_WRITE_BG = [255, 0, 0] COLOR_WRITE_BG = [255, 0, 0]
current_mode = MODE_FINGER current_mode = MODE_FINGER
# FIFO from callback
new_datasets = []
history = [] history = []
filebuffer = bytearray() filebuffer = bytearray()
write = 0 recording_timestamp = 0
bias = True bias = True
update_screen = 0 samples_since_draw = 0
pause_screen = 0 pause_screen = 0
pause_histogram = False pause_histogram = False
histogram_offset = 0 histogram_offset = 0
sensor = 0 sensor = 0
disp = display.open() disp = display.open()
mapper = bg.ActionMapper()
KEYMAP_PAUSED = {}
KEYMAP_NORMAL = {}
leds.dim_top(1) leds.dim_top(1)
COLORS = [((23 + (15 * i)) % 360, 1.0, 1.0) for i in range(11)] COLORS = [((23 + (15 * i)) % 360, 1.0, 1.0) for i in range(11)]
...@@ -47,9 +54,9 @@ alpha = 2 ...@@ -47,9 +54,9 @@ alpha = 2
beta = 3 beta = 3
def update_history(datasets): def update_history(samples):
global history, moving_average, alpha, beta global history, moving_average, alpha, beta
for val in datasets: for val in samples:
history.append(val - moving_average) history.append(val - moving_average)
moving_average = (alpha * moving_average + beta * val) / (alpha + beta) moving_average = (alpha * moving_average + beta * val) / (alpha + beta)
...@@ -75,8 +82,8 @@ def neighbours(n, lst): ...@@ -75,8 +82,8 @@ def neighbours(n, lst):
yield lst[i : i + n] yield lst[i : i + n]
def detect_pulse(num_new_samples): def detect_pulse(all_samples, num_new_samples):
global history, pulse, samples_since_last_pulse, q_threshold, r_threshold, q_spike global pulse, samples_since_last_pulse, q_threshold, r_threshold, q_spike
# look at 3 consecutive samples, starting 2 samples before the samples that were just added, e.g.: # look at 3 consecutive samples, starting 2 samples before the samples that were just added, e.g.:
# existing samples: "ABCD" # existing samples: "ABCD"
...@@ -84,7 +91,7 @@ def detect_pulse(num_new_samples): ...@@ -84,7 +91,7 @@ def detect_pulse(num_new_samples):
# consider ["CDE", "DEF"] # consider ["CDE", "DEF"]
# new samples: "GHI" => "ABCDEFGHI" # new samples: "GHI" => "ABCDEFGHI"
# consider ["EFG", "FGH", "GHI"] # consider ["EFG", "FGH", "GHI"]
for [prev, cur, next_] in neighbours(3, history[-(num_new_samples + 2) :]): for [prev, cur, next_] in neighbours(3, all_samples[-(num_new_samples + 2) :]):
samples_since_last_pulse += 1 samples_since_last_pulse += 1
if prev > cur < next_ and cur < q_threshold: if prev > cur < next_ and cur < q_threshold:
...@@ -110,32 +117,36 @@ def detect_pulse(num_new_samples): ...@@ -110,32 +117,36 @@ def detect_pulse(num_new_samples):
pulse = -1 pulse = -1
def callback_ecg(datasets): def callback_ecg(dataset):
global update_screen, history, filebuffer, write new_datasets.append(dataset)
update_screen += len(datasets)
def process_samples(samples):
global samples_since_draw, history, filebuffer, recording_timestamp
samples_since_draw += len(samples)
# update histogram datalist # update histogram datalist
if not pause_histogram: if not pause_histogram:
update_history(datasets) update_history(samples)
detect_pulse(len(datasets)) detect_pulse(history, len(samples))
# buffer for writes # buffer for writes
if write > 0: if recording_timestamp > 0:
for value in datasets: for value in samples:
filebuffer.extend(struct.pack("h", value)) filebuffer.extend(struct.pack("h", value))
if len(filebuffer) >= FILEBUFFERBLOCK: if len(filebuffer) >= FILEBUFFERBLOCK:
write_filebuffer() write_filebuffer()
# don't update on every callback # don't update on every callback
if update_screen >= DRAW_AFTER_SAMPLES: if samples_since_draw >= DRAW_AFTER_SAMPLES:
draw_histogram() draw_histogram()
samples_since_draw = 0
def write_filebuffer(): def write_filebuffer():
global write, filebuffer global recording_timestamp, filebuffer
# write to file # write to file
chars = "" chars = ""
lt = utime.localtime(write) lt = utime.localtime(recording_timestamp)
filename = "/ecg-{:04d}-{:02d}-{:02d}_{:02d}{:02d}{:02d}.log".format(*lt) filename = "/ecg-{:04d}-{:02d}-{:02d}_{:02d}{:02d}{:02d}.log".format(*lt)
# write stuff to disk # write stuff to disk
...@@ -145,7 +156,7 @@ def write_filebuffer(): ...@@ -145,7 +156,7 @@ def write_filebuffer():
f.close() f.close()
except OSError as e: except OSError as e:
print("Please check the file or filesystem", e) print("Please check the file or filesystem", e)
write = 0 recording_timestamp = 0
pause_screen = -1 pause_screen = -1
disp.clear(COLOR_BACKGROUND) disp.clear(COLOR_BACKGROUND)
disp.print("IO Error", posy=0, fg=COLOR_TEXT) disp.print("IO Error", posy=0, fg=COLOR_TEXT)
...@@ -156,7 +167,7 @@ def write_filebuffer(): ...@@ -156,7 +167,7 @@ def write_filebuffer():
close_sensor() close_sensor()
except: except:
print("Unexpected error, stop writeing logfile") print("Unexpected error, stop writeing logfile")
write = 0 recording_timestamp = 0
filebuffer = bytearray() filebuffer = bytearray()
...@@ -172,8 +183,8 @@ def open_sensor(): ...@@ -172,8 +183,8 @@ def open_sensor():
def close_sensor(): def close_sensor():
global sensor
sensor.close() sensor.close()
pass
def toggle_mode(): def toggle_mode():
...@@ -191,30 +202,52 @@ def toggle_bias(): ...@@ -191,30 +202,52 @@ def toggle_bias():
def toggle_write(): def toggle_write():
global write, disp, pause_screen global recording_timestamp, disp, pause_screen
pause_screen = utime.time_ms() + 1000 pause_screen = utime.time_ms() + 1000
disp.clear(COLOR_BACKGROUND) disp.clear(COLOR_BACKGROUND)
if write > 0: if recording_timestamp > 0:
write_filebuffer() write_filebuffer()
write = 0 recording_timestamp = 0
disp.print("Stop", posx=50, posy=20, fg=COLOR_TEXT) disp.print("Stop", posx=50, posy=20, fg=COLOR_TEXT)
disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT) disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT)
else: else:
filebuffer = bytearray() filebuffer = bytearray()
write = utime.time() recording_timestamp = utime.time()
disp.print("Start", posx=45, posy=20, fg=COLOR_TEXT) disp.print("Start", posx=45, posy=20, fg=COLOR_TEXT)
disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT) disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT)
disp.update() disp.update()
def toggle_pause(): def set_paused(paused):
global pause_histogram, histogram_offset, history global pause_histogram, histogram_offset, history
if pause_histogram:
pause_histogram = False pause_histogram = paused
history = [] histogram_offset = 0
if paused:
mapper.set_mapping(KEYMAP_PAUSED)
else: else:
pause_histogram = True mapper.set_mapping(KEYMAP_NORMAL)
history = []
def scroll_left():
global histogram_offset
l = len(history)
histogram_offset += ECG_RATE / 2
if l - histogram_offset < WIDTH:
histogram_offset = l - WIDTH
def scroll_right():
global histogram_offset
histogram_offset -= ECG_RATE / 2
histogram_offset -= histogram_offset % (ECG_RATE / 2)
if histogram_offset < 0:
histogram_offset = 0 histogram_offset = 0
...@@ -226,7 +259,7 @@ def draw_leds(val): ...@@ -226,7 +259,7 @@ def draw_leds(val):
def draw_histogram(): def draw_histogram():
global disp, history, current_mode, bias, write, pause_screen, update_screen global disp, history, current_mode, bias, recording_timestamp, pause_screen
# skip rendering due to message beeing shown # skip rendering due to message beeing shown
if pause_screen == -1: if pause_screen == -1:
...@@ -294,17 +327,29 @@ def draw_histogram(): ...@@ -294,17 +327,29 @@ def draw_histogram():
) )
# announce writing ecg log # announce writing ecg log
if write > 0: if recording_timestamp > 0:
t = utime.time() t = utime.time()
if write > 0 and t % 2 == 0: if recording_timestamp > 0 and t % 2 == 0:
disp.print("LOG", posx=0, posy=60, fg=COLOR_WRITE_FG, bg=COLOR_WRITE_BG) disp.print("LOG", posx=0, posy=60, fg=COLOR_WRITE_FG, bg=COLOR_WRITE_BG)
disp.update() disp.update()
update_screen = 0
def main(): def main():
global pause_histogram, histogram_offset global new_datasets, KEYMAP_NORMAL, KEYMAP_PAUSED
KEYMAP_NORMAL = {
(buttons.BOTTOM_LEFT, bg.SHORT_PRESS): toggle_write,
(buttons.BOTTOM_RIGHT, bg.SHORT_PRESS): toggle_bias,
(buttons.TOP_RIGHT, bg.SHORT_PRESS): toggle_mode,
(buttons.TOP_RIGHT, bg.LONG_PRESS): lambda: set_paused(True),
}
KEYMAP_PAUSED = {
(buttons.BOTTOM_LEFT, bg.SHORT_PRESS): scroll_left,
(buttons.BOTTOM_RIGHT, bg.SHORT_PRESS): scroll_right,
(buttons.TOP_RIGHT, bg.SHORT_PRESS): lambda: set_paused(False),
}
# show button layout # show button layout
disp.clear(COLOR_BACKGROUND) disp.clear(COLOR_BACKGROUND)
...@@ -317,71 +362,17 @@ def main(): ...@@ -317,71 +362,17 @@ def main():
# start ecg # start ecg
open_sensor() open_sensor()
while True:
button_pressed = {"BOTTOM_LEFT": 0, "BOTTOM_RIGHT": 0, "TOP_RIGHT": 0}
while True:
v = buttons.read(
buttons.BOTTOM_LEFT | buttons.BOTTOM_RIGHT | buttons.TOP_RIGHT
)
# BOTTOM LEFT
if button_pressed["BOTTOM_LEFT"] == 0 and v & buttons.BOTTOM_LEFT != 0:
button_pressed["BOTTOM_LEFT"] = utime.time_ms()
if not pause_histogram:
toggle_write()
else:
l = len(history)
histogram_offset += ECG_RATE / 2
if l - histogram_offset < WIDTH:
histogram_offset = l - WIDTH
if button_pressed["BOTTOM_LEFT"] > 0 and v & buttons.BOTTOM_LEFT == 0:
duration = utime.time_ms() - button_pressed["BOTTOM_LEFT"]
button_pressed["BOTTOM_LEFT"] = 0
# BOTTOM RIGHT
if button_pressed["BOTTOM_RIGHT"] == 0 and v & buttons.BOTTOM_RIGHT != 0:
button_pressed["BOTTOM_RIGHT"] = utime.time_ms()
if not pause_histogram:
toggle_bias()
else:
histogram_offset -= ECG_RATE / 2
histogram_offset -= histogram_offset % (ECG_RATE / 2)
if histogram_offset < 0:
histogram_offset = 0
if button_pressed["BOTTOM_RIGHT"] > 0 and v & buttons.BOTTOM_RIGHT == 0:
duration = utime.time_ms() - button_pressed["BOTTOM_RIGHT"]
button_pressed["BOTTOM_RIGHT"] = 0
# TOP RIGHT mapper.set_mapping(KEYMAP_NORMAL)
while True:
# down, and still pressed mapper.update()
if button_pressed["TOP_RIGHT"] > 0 and v & buttons.TOP_RIGHT != 0:
duration = utime.time_ms() - button_pressed["TOP_RIGHT"]
if duration > 1000:
button_pressed["TOP_RIGHT"] = -1
toggle_pause()
# register down event
elif button_pressed["TOP_RIGHT"] == 0 and v & buttons.TOP_RIGHT != 0:
button_pressed["TOP_RIGHT"] = utime.time_ms()
# register up event but event already called
if button_pressed["TOP_RIGHT"] == -1 and v & buttons.TOP_RIGHT == 0:
button_pressed["TOP_RIGHT"] = 0
# register normal up event processed_datasets = 0
elif button_pressed["TOP_RIGHT"] > 0 and v & buttons.TOP_RIGHT == 0: for dataset in new_datasets:
duration = utime.time_ms() - button_pressed["TOP_RIGHT"] process_samples(dataset)
button_pressed["TOP_RIGHT"] = 0 processed_datasets += 1
if pause_histogram:
toggle_pause()
else:
toggle_mode()
new_datasets = new_datasets[processed_datasets:]
if __name__ == "__main__": if __name__ == "__main__":
main() main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment