diff --git a/preload/apps/ecg/__init__.py b/preload/apps/ecg/__init__.py index da30babcc85a0c11dd170900b24ce9ae543a46d9..775fb2cec01d8170f9aa1ccd2ff34cf7223d2f38 100644 --- a/preload/apps/ecg/__init__.py +++ b/preload/apps/ecg/__init__.py @@ -7,12 +7,13 @@ import max30001 import math import struct import itertools +from ecg.settings import * +config = ecg_settings() WIDTH = 160 HEIGHT = 80 OFFSET_Y = 49 -ECG_RATE = 128 -HISTORY_MAX = ECG_RATE * 4 +HISTORY_MAX = WIDTH * 4 DRAW_AFTER_SAMPLES = 5 SCALE_FACTOR = 30 MODE_USB = "USB" @@ -26,24 +27,18 @@ COLOR_MODE_USB = [0, 0, 255] COLOR_WRITE_FG = [255, 255, 255] COLOR_WRITE_BG = [255, 0, 0] -current_mode = MODE_FINGER -modes = itertools.cycle( - [ - ({"bar", "pulse"}, {"text": "Top + Pulse", "posx": 0}), - ({}, {"text": "off", "posx": 55}), - ({"bar"}, {"text": "Top Only", "posx": 25}), - ({"pulse"}, {"text": "Pulse Only", "posx": 5}), - ] -) -led_mode = next(modes)[0] history = [] + +# variables for file output filebuffer = bytearray() write = 0 -bias = True +write_time_string = "" +samples_since_start_of_write = 0 + update_screen = 0 pause_screen = 0 -pause_histogram = False -histogram_offset = 0 +pause_graph = False +graph_offset = 0 sensor = 0 disp = display.open() last_sample_count = 1 @@ -64,7 +59,7 @@ def update_history(datasets): global history, moving_average, alpha, beta, last_sample_count last_sample_count = len(datasets) for val in datasets: - if current_mode == MODE_FINGER: + if "HP" in config.get_option("Filter"): history.append(val - moving_average) moving_average += betadash * (val - moving_average) # identical to: moving_average = (alpha * moving_average + beta * val) / (alpha + beta) @@ -81,7 +76,7 @@ samples_since_last_pulse = 0 last_pulse_blink = 0 q_threshold = -1 r_threshold = 1 -q_spike = -ECG_RATE +q_spike = -500 # just needs to be long ago def neighbours(n, lst): @@ -103,6 +98,8 @@ def detect_pulse(num_new_samples): # consider ["CDE", "DEF"] # new samples: "GHI" => "ABCDEFGHI" # consider ["EFG", "FGH", "GHI"] + ecg_rate = config.get_option("Rate") + for [prev, cur, next_] in neighbours(3, history[-(num_new_samples + 2) :]): samples_since_last_pulse += 1 @@ -113,33 +110,37 @@ def detect_pulse(num_new_samples): elif ( prev < cur > next_ and cur > r_threshold - and samples_since_last_pulse - q_spike < ECG_RATE // 10 + and samples_since_last_pulse - q_spike < ecg_rate // 10 ): - # the full QRS complex is < 0.1s long, so the q and r spike in particular cannot be more than ECG_RATE//10 samples apart - pulse = 60 * ECG_RATE // samples_since_last_pulse - samples_since_last_pulse = 0 - q_spike = -ECG_RATE + # the full QRS complex is < 0.1s long, so the q and r spike in particular cannot be more than ecg_rate//10 samples apart + pulse = 60 * ecg_rate // samples_since_last_pulse + q_spike = -ecg_rate if pulse < 30 or pulse > 210: pulse = -1 + elif write > 0 and "pulse" in config.get_option("Log"): + write_pulse() # we expect the next r-spike to be at least 60% as high as this one r_threshold = (cur * 3) // 5 - elif samples_since_last_pulse > 2 * ECG_RATE: + samples_since_last_pulse = 0 + elif samples_since_last_pulse > 2 * ecg_rate: q_threshold = -1 r_threshold = 1 pulse = -1 def callback_ecg(datasets): - global update_screen, history, filebuffer, write + global update_screen, history, filebuffer, write, samples_since_start_of_write update_screen += len(datasets) + if write > 0: + samples_since_start_of_write += len(datasets) - # update histogram datalist - if not pause_histogram: + # update graph datalist + if not pause_graph: update_history(datasets) detect_pulse(len(datasets)) # buffer for writes - if write > 0: + if write > 0 and "graph" in config.get_option("Log"): for value in datasets: filebuffer.extend(struct.pack("h", value)) if len(filebuffer) >= FILEBUFFERBLOCK: @@ -147,20 +148,18 @@ def callback_ecg(datasets): # don't update on every callback if update_screen >= DRAW_AFTER_SAMPLES: - draw_histogram() + draw_graph() -def write_filebuffer(): - global write, filebuffer +def append_to_file(fileprefix, content): + global write, pause_screen # write to file - chars = "" - lt = utime.localtime(write) - filename = "/ecg-{:04d}-{:02d}-{:02d}_{:02d}{:02d}{:02d}.log".format(*lt) + filename = "/ecg_logs/{}-{}.log".format(fileprefix, write_time_string) # write stuff to disk try: f = open(filename, "ab") - f.write(filebuffer) + f.write(content) f.close() except OSError as e: print("Please check the file or filesystem", e) @@ -174,18 +173,28 @@ def write_filebuffer(): disp.update() close_sensor() except: - print("Unexpected error, stop writeing logfile") + print("Unexpected error, stop writing logfile") write = 0 + +def write_pulse(): + # estimates timestamp as calls to utime.time() take too much time + approx_timestamp = write + samples_since_start_of_write // config.get_option("Rate") + append_to_file("pulse", struct.pack("ib", approx_timestamp, pulse)) + + +def write_filebuffer(): + global filebuffer + append_to_file("ecg", filebuffer) filebuffer = bytearray() def open_sensor(): global sensor sensor = max30001.MAX30001( - usb=(current_mode == MODE_USB), - bias=bias, - sample_rate=ECG_RATE, + usb=(config.get_option("Mode") == "USB"), + bias=config.get_option("Bias"), + sample_rate=config.get_option("Rate"), callback=callback_ecg, ) @@ -195,36 +204,8 @@ def close_sensor(): sensor.close() -def toggle_mode(): - global current_mode, disp, pause_screen - if write > 0: - pause_screen = utime.time_ms() + 500 - disp.clear(COLOR_BACKGROUND) - disp.print("Locked", posx=30, posy=30, fg=COLOR_TEXT) - disp.update() - return - - close_sensor() - current_mode = MODE_USB if current_mode == MODE_FINGER else MODE_FINGER - open_sensor() - - -def toggle_bias(): - global bias, disp, pause_screen - if write > 0: - pause_screen = utime.time_ms() + 500 - disp.clear(COLOR_BACKGROUND) - disp.print("Locked", posx=30, posy=30, fg=COLOR_TEXT) - disp.update() - return - - close_sensor() - bias = not bias - open_sensor() - - def toggle_write(): - global write, disp, pause_screen + global write, disp, pause_screen, filebuffer, samples_since_start_of_write, write_time_string pause_screen = utime.time_ms() + 1000 disp.clear(COLOR_BACKGROUND) if write > 0: @@ -235,6 +216,13 @@ def toggle_write(): else: filebuffer = bytearray() write = utime.time() + lt = utime.localtime(write) + write_time_string = "{:04d}-{:02d}-{:02d}_{:02d}{:02d}{:02d}".format(*lt) + samples_since_start_of_write = 0 + try: + os.mkdir("ecg_logs") + except: + pass disp.print("Start", posx=45, posy=20, fg=COLOR_TEXT) disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT) @@ -242,25 +230,13 @@ def toggle_write(): def toggle_pause(): - global pause_histogram, histogram_offset, history, leds - if pause_histogram: - pause_histogram = False + global pause_graph, graph_offset, history, leds + if pause_graph: + pause_graph = False history = [] else: - pause_histogram = True - histogram_offset = 0 - leds.clear() - - -def toggle_leds(): - global led_mode, disp, pause_screen, leds, modes - led_mode, display_args = next(modes) - - pause_screen = utime.time_ms() + 250 - disp.clear(COLOR_BACKGROUND) - disp.print("LEDs", posx=50, posy=20, fg=COLOR_TEXT) - disp.print(**display_args, posy=40, fg=COLOR_TEXT) - disp.update() + pause_graph = True + graph_offset = 0 leds.clear() @@ -269,6 +245,8 @@ def draw_leds(vmin, vmax): # vmax should be in [0, 1] global pulse, samples_since_last_pulse, last_pulse_blink + led_mode = config.get_option("LEDs") + # stop blinking if not bool(led_mode): return @@ -300,8 +278,8 @@ def draw_leds(vmin, vmax): leds.update() -def draw_histogram(): - global disp, history, current_mode, bias, write, pause_screen, update_screen +def draw_graph(): + global disp, history, write, pause_screen, update_screen # skip rendering due to message beeing shown if pause_screen == -1: @@ -315,32 +293,32 @@ def draw_histogram(): disp.clear(COLOR_BACKGROUND) - # offset in pause_histogram mode - window_end = int(len(history) - histogram_offset) - s_start = max(0, window_end - (ECG_RATE * 2)) + # offset in pause_graph mode + timeWindow = config.get_option("Window") + window_end = int(len(history) - graph_offset) s_end = max(0, window_end) - s_draw = max(0, s_end - WIDTH) + s_start = max(0, s_end - WIDTH * timeWindow) # get max value and calc scale value_max = max(abs(x) for x in history[s_start:s_end]) scale = SCALE_FACTOR / (value_max if value_max > 0 else 1) - # draw histogram + # draw graph # values need to be inverted so high values are drawn with low pixel coordinates (at the top of the screen) - draw_points = (int(-x * scale + OFFSET_Y) for x in history[s_draw:s_end]) + draw_points = (int(-x * scale + OFFSET_Y) for x in history[s_start:s_end]) prev = next(draw_points) for x, value in enumerate(draw_points): - disp.line(x, prev, x + 1, value, col=COLOR_LINE) + disp.line(x // timeWindow, prev, (x + 1) // timeWindow, value, col=COLOR_LINE) prev = value # draw text: mode/bias/write - if pause_histogram: + if pause_graph: disp.print( "Pause" + ( - " -{:0.1f}s".format(histogram_offset / ECG_RATE) - if histogram_offset > 0 + " -{:0.1f}s".format(graph_offset / config.get_option("Rate")) + if graph_offset > 0 else "" ), posx=0, @@ -354,11 +332,14 @@ def draw_histogram(): ) if pulse < 0: disp.print( - current_mode + ("+Bias" if bias else ""), + config.get_option("Mode") + + ("+Bias" if config.get_option("Bias") else ""), posx=0, posy=0, fg=( - COLOR_MODE_FINGER if current_mode == MODE_FINGER else COLOR_MODE_USB + COLOR_MODE_FINGER + if config.get_option("Mode") == MODE_FINGER + else COLOR_MODE_USB ), ) else: @@ -367,7 +348,9 @@ def draw_histogram(): posx=0, posy=0, fg=( - COLOR_MODE_FINGER if current_mode == MODE_FINGER else COLOR_MODE_USB + COLOR_MODE_FINGER + if config.get_option("Mode") == MODE_FINGER + else COLOR_MODE_USB ), ) @@ -382,7 +365,7 @@ def draw_histogram(): def main(): - global pause_histogram, histogram_offset + global pause_graph, graph_offset, pause_screen # show button layout disp.clear(COLOR_BACKGROUND) @@ -392,10 +375,10 @@ def main(): " Pause >", posx=0, posy=28, fg=COLOR_MODE_FINGER, font=display.FONT16 ) disp.print( - " Mode/Bias >", posx=0, posy=44, fg=COLOR_MODE_USB, font=display.FONT16 + " Settings >", posx=0, posy=44, fg=COLOR_MODE_USB, font=display.FONT16 ) disp.print( - "< LED/WriteLog", posx=0, posy=64, fg=COLOR_WRITE_BG, font=display.FONT16 + "< WriteLog ", posx=0, posy=64, fg=COLOR_WRITE_BG, font=display.FONT16 ) disp.update() utime.sleep(3) @@ -410,90 +393,64 @@ def main(): ) # TOP RIGHT + # + # pause # down if button_pressed["TOP_RIGHT"] == 0 and v & buttons.TOP_RIGHT != 0: - button_pressed["TOP_RIGHT"] = utime.time_ms() + button_pressed["TOP_RIGHT"] = 1 toggle_pause() # up if button_pressed["TOP_RIGHT"] > 0 and v & buttons.TOP_RIGHT == 0: - duration = utime.time_ms() - button_pressed["TOP_RIGHT"] button_pressed["TOP_RIGHT"] = 0 # BOTTOM LEFT # # on pause = shift view left - # long = toggle write - # short = toggle leds - - # down, and still pressed - if ( - button_pressed["BOTTOM_LEFT"] > 0 - and v & buttons.BOTTOM_LEFT != 0 - and not pause_histogram - ): - duration = utime.time_ms() - button_pressed["BOTTOM_LEFT"] - if duration > 1000: - button_pressed["BOTTOM_LEFT"] = -1 - toggle_write() - - # register down event - elif button_pressed["BOTTOM_LEFT"] == 0 and v & buttons.BOTTOM_LEFT != 0: - button_pressed["BOTTOM_LEFT"] = utime.time_ms() + # else = toggle write - # register up event but event already called - if button_pressed["BOTTOM_LEFT"] == -1 and v & buttons.BOTTOM_LEFT == 0: - button_pressed["BOTTOM_LEFT"] = 0 + # down + if button_pressed["BOTTOM_LEFT"] == 0 and v & buttons.BOTTOM_LEFT != 0: + button_pressed["BOTTOM_LEFT"] = 1 + if pause_graph: + l = len(history) + graph_offset += config.get_option("Rate") / 2 + if l - graph_offset < WIDTH * config.get_option("Window"): + graph_offset = l - WIDTH * config.get_option("Window") + else: + toggle_write() - # register normal up event - elif button_pressed["BOTTOM_LEFT"] > 0 and v & buttons.BOTTOM_LEFT == 0: - duration = utime.time_ms() - button_pressed["BOTTOM_LEFT"] + # up + if button_pressed["BOTTOM_LEFT"] > 0 and v & buttons.BOTTOM_LEFT == 0: button_pressed["BOTTOM_LEFT"] = 0 - if not pause_histogram: - toggle_leds() - else: - l = len(history) - histogram_offset += ECG_RATE / 2 - if l - histogram_offset < WIDTH: - histogram_offset = l - WIDTH # BOTTOM RIGHT # # on pause = shift view right - # long = toggle bias - # short = toggle mode (finger/usb) - - # down, and still pressed - if ( - button_pressed["BOTTOM_RIGHT"] > 0 - and v & buttons.BOTTOM_RIGHT != 0 - and not pause_histogram - ): - duration = utime.time_ms() - button_pressed["BOTTOM_RIGHT"] - if duration > 1000: - button_pressed["BOTTOM_RIGHT"] = -1 - toggle_bias() - - # register down event - elif button_pressed["BOTTOM_RIGHT"] == 0 and v & buttons.BOTTOM_RIGHT != 0: - button_pressed["BOTTOM_RIGHT"] = utime.time_ms() - - # register up event but event already called - if button_pressed["BOTTOM_RIGHT"] == -1 and v & buttons.BOTTOM_RIGHT == 0: - button_pressed["BOTTOM_RIGHT"] = 0 + # else = show settings - # register normal up event - elif button_pressed["BOTTOM_RIGHT"] > 0 and v & buttons.BOTTOM_RIGHT == 0: - duration = utime.time_ms() - button_pressed["BOTTOM_RIGHT"] - button_pressed["BOTTOM_RIGHT"] = 0 - if pause_histogram: - histogram_offset -= ECG_RATE / 2 - histogram_offset -= histogram_offset % (ECG_RATE / 2) - if histogram_offset < 0: - histogram_offset = 0 + # down + if button_pressed["BOTTOM_RIGHT"] == 0 and v & buttons.BOTTOM_RIGHT != 0: + button_pressed["BOTTOM_RIGHT"] = 1 + if pause_graph: + graph_offset -= config.get_option("Rate") / 2 + graph_offset -= graph_offset % (config.get_option("Rate") / 2) + if graph_offset < 0: + graph_offset = 0 else: - toggle_mode() + pause_screen = -1 # hide graph + leds.clear() # disable all LEDs + config.run() # show config menu + close_sensor() # reset sensor in case mode or bias was changed TODO do not close sensor otherwise? + open_sensor() + pause_screen = 0 # start plotting graph again + # returning from menu was by pressing the TOP_RIGHT button + button_pressed["TOP_RIGHT"] = 1 + + # up + if button_pressed["BOTTOM_RIGHT"] > 0 and v & buttons.BOTTOM_RIGHT == 0: + button_pressed["BOTTOM_RIGHT"] = 0 if __name__ == "__main__": diff --git a/preload/apps/ecg/settings.py b/preload/apps/ecg/settings.py new file mode 100644 index 0000000000000000000000000000000000000000..ca49bbd109a0b7d372b7320ea5b89c4444fd396b --- /dev/null +++ b/preload/apps/ecg/settings.py @@ -0,0 +1,113 @@ +import color +import simple_menu +import itertools + + +class Settings(simple_menu.Menu): + color_1 = color.CAMPGREEN + color_2 = color.CAMPGREEN_DARK + + selected_options = {} + + def __init__(self): + super().__init__([("return", False)]) + + def on_select(self, value, index): + if index == 0: + self.exit() + else: + self.selected_options[value[0]] = next(value[1]) + self.write_to_file() + + def entry2name(self, value): + if value[0] == "return": + return value[0] + else: + return "{}: {}".format(value[0], self.selected_options[value[0]][0]) + + def add_option(self, option): + self.entries.append(option) + self.selected_options[option[0]] = next(option[1]) + + def get_option(self, name): + return self.selected_options[name][1] + + def load_from_file(self): + config_path = "/".join(__file__.split("/")[0:-1]) + try: + f = open("{}/config.cfg".format(config_path), "r") + for line in f: + parts = [x.strip() for x in line.split(":")] + if parts[0] in self.selected_options: + # find corresponding entry from menu to get access to the corresponding itertools.cycle + option_cycle = next(x for x in self.entries if x[0] == parts[0])[1] + if self.selected_options[parts[0]][0] != parts[1]: + previous = self.selected_options[parts[0]][0] + self.selected_options[parts[0]] = next(option_cycle) + while self.selected_options[parts[0]][0] not in { + parts[1], + previous, + }: + self.selected_options[parts[0]] = next(option_cycle) + + if self.selected_options[parts[0]][0] == previous: + print( + "Settings: unknown option '{}' for key '{}'".format( + parts[1], parts[0] + ) + ) + else: + print("Settings: unknown key '{}'".format(parts[0])) + f.close() + except OSError: + print("Settings could not be loaded from file. Maybe it did not exist yet?") + + def write_to_file(self): + config_path = "/".join(__file__.split("/")[0:-1]) + try: + f = open("{}/config.cfg".format(config_path), "w") + for option_name in self.selected_options: + f.write( + "{}:{}\n".format(option_name, self.selected_options[option_name][0]) + ) + f.close() + except OSError as e: + print("Settings could not be written to file! Error: {}".format(e)) + + +def ecg_settings(): + config = Settings() + config.add_option( + ( + "LEDs", + itertools.cycle( + [ + ("off", {}), + ("Pulse", {"pulse"}), + ("Bar", {"bar"}), + ("Full", {"pulse", "bar"}), + ] + ), + ) + ) + config.add_option(("Mode", itertools.cycle([("Finger", "Finger"), ("USB", "USB")]))) + config.add_option(("Bias", itertools.cycle([("on", True), ("off", False)]))) + config.add_option(("Filter", itertools.cycle([("HP", {"HP"}), ("off", {})]))) + config.add_option(("Rate", itertools.cycle([("128Hz", 128), ("256Hz", 256)]))) + config.add_option(("Window", itertools.cycle([("1x", 1), ("2x", 2), ("3x", 3)]))) + config.add_option( + ( + "Log", + itertools.cycle( + [ + ("graph", {"graph"}), + ("pulse", {"pulse"}), + ("full", {"graph", "pulse"}), + ] + ), + ) + ) + + config.load_from_file() + + return config