From 94cc9f71360bd2e54690ac9bf9bccc0126be5e83 Mon Sep 17 00:00:00 2001 From: schneider <schneider@blinkenlichts.net> Date: Sun, 14 Nov 2021 00:43:19 +0100 Subject: [PATCH] ecg: Convert ECG app to use a class. Remove use of "global" --- preload/apps/ecg/__init__.py | 905 +++++++++++++++++------------------ 1 file changed, 447 insertions(+), 458 deletions(-) diff --git a/preload/apps/ecg/__init__.py b/preload/apps/ecg/__init__.py index a19cebc8..e9b1599a 100644 --- a/preload/apps/ecg/__init__.py +++ b/preload/apps/ecg/__init__.py @@ -11,6 +11,7 @@ import bluetooth from ecg.settings import * config = ecg_settings() + WIDTH = 160 HEIGHT = 80 OFFSET_Y = 49 @@ -27,501 +28,489 @@ COLOR_MODE_FINGER = [0, 255, 0] COLOR_MODE_USB = [0, 0, 255] COLOR_WRITE_FG = [255, 255, 255] COLOR_WRITE_BG = [255, 0, 0] - -history = [] - -# variables for file output -filebuffer = bytearray() -write = 0 -write_time_string = "" -samples_since_start_of_write = 0 - -update_screen = 0 -pause_screen = 0 -pause_graph = False -graph_offset = 0 -sensor = 0 -disp = display.open() -last_sample_count = 1 - +COLORS = [((23 + (15 * i)) % 360, 1.0, 1.0) for i in range(11)] _IRQ_CENTRAL_CONNECT = const(1) _IRQ_CENTRAL_DISCONNECT = const(2) _IRQ_GATTS_WRITE = const(3) -def ble_irq(event, data): - global ble_streaming - if event == _IRQ_CENTRAL_CONNECT: - print("BLE Connected") - elif event == _IRQ_CENTRAL_DISCONNECT: - print("BLE Disconnected") - elif event == _IRQ_GATTS_WRITE: - conn_handle, value_handle = data - if value_handle == ecg_cccd_handle: - value = b.gatts_read(value_handle) - print("New cccd value:", value) - # Value of 0 means notifcations off - if value == b"\x00\x00": - ble_streaming = False - if not config.get_option("BLE Disp"): - disp.backlight(20) - else: - ble_streaming = True - if not config.get_option("BLE Disp"): - disp.backlight(0) - - -b = bluetooth.BLE() -b.active(True) -b.irq(ble_irq) - -_ECG_UUID = bluetooth.UUID("42230300-2342-2342-2342-234223422342") -_ECG_DATA = ( - bluetooth.UUID("42230301-2342-2342-2342-234223422342"), - bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, -) - -_ECG_SERVICE = (_ECG_UUID, (_ECG_DATA,)) -((ecg_data_handle,),) = b.gatts_register_services((_ECG_SERVICE,)) -ecg_cccd_handle = ecg_data_handle + 1 - -# Disable streaming by default -ble_streaming = False -b.gatts_write(ecg_cccd_handle, "\x00\x00") - -leds.dim_top(1) -COLORS = [((23 + (15 * i)) % 360, 1.0, 1.0) for i in range(11)] - -# variables for high-pass filter -# note: corresponds to 1st order hpf with -3dB at ~18.7Hz -# general formula: f(-3dB)=-(sample_rate/tau)*ln(1-betadash) -moving_average = 0 -alpha = 2 -beta = 3 -betadash = beta / (alpha + beta) - - -def update_history(datasets): - global history, moving_average, alpha, beta, last_sample_count - last_sample_count = len(datasets) - for val in datasets: - if "HP" in config.get_option("Filter"): - history.append(val - moving_average) - moving_average += betadash * (val - moving_average) - # identical to: moving_average = (alpha * moving_average + beta * val) / (alpha + beta) - else: - history.append(val) - - # trim old elements - history = history[-HISTORY_MAX:] - - -# variables for pulse detection -pulse = -1 -samples_since_last_pulse = 0 -last_pulse_blink = 0 -q_threshold = -1 -r_threshold = 1 -q_spike = -500 # just needs to be long ago - +class ECG: + history = [] -def neighbours(n, lst): - """ - neighbours(2, "ABCDE") = ("AB", "BC", "CD", "DE") - neighbours(3, "ABCDE") = ("ABC", "BCD", "CDE") - """ - - for i in range(len(lst) - (n - 1)): - yield lst[i : i + n] + # variables for file output + filebuffer = bytearray() + write = 0 + write_time_string = "" + samples_since_start_of_write = 0 + update_screen = 0 + pause_screen = 0 + pause_graph = False + graph_offset = 0 + sensor = None + disp = display.open() + last_sample_count = 1 + + # variables for high-pass filter + # note: corresponds to 1st order hpf with -3dB at ~18.7Hz + # general formula: f(-3dB)=-(sample_rate/tau)*ln(1-betadash) + moving_average = 0 + alpha = 2 + beta = 3 + betadash = beta / (alpha + beta) + + # variables for pulse detection + pulse = -1 + samples_since_last_pulse = 0 + last_pulse_blink = 0 + q_threshold = -1 + r_threshold = 1 + q_spike = -500 # just needs to be long ago + + def ble_irq(self, event, data): + if event == _IRQ_CENTRAL_CONNECT: + print("BLE Connected") + elif event == _IRQ_CENTRAL_DISCONNECT: + print("BLE Disconnected") + elif event == _IRQ_GATTS_WRITE: + conn_handle, value_handle = data + if value_handle == self.ecg_cccd_handle: + value = self.b.gatts_read(value_handle) + print("New cccd value:", value) + # Value of 0 means notifcations off + if value == b"\x00\x00": + self.ble_streaming = False + if not config.get_option("BLE Disp"): + self.disp.backlight(20) + else: + self.ble_streaming = True + if not config.get_option("BLE Disp"): + self.disp.backlight(0) + + def __init__(self): + self.b = bluetooth.BLE() + self.b.active(True) + self.b.irq(self.ble_irq) + + _ECG_UUID = bluetooth.UUID("42230300-2342-2342-2342-234223422342") + _ECG_DATA = ( + bluetooth.UUID("42230301-2342-2342-2342-234223422342"), + bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY, + ) -def detect_pulse(num_new_samples): - global history, pulse, samples_since_last_pulse, q_threshold, r_threshold, q_spike + _ECG_SERVICE = (_ECG_UUID, (_ECG_DATA,)) + ((self.ecg_data_handle,),) = self.b.gatts_register_services((_ECG_SERVICE,)) + self.ecg_cccd_handle = self.ecg_data_handle + 1 - # look at 3 consecutive samples, starting 2 samples before the samples that were just added, e.g.: - # existing samples: "ABCD" - # new samples: "EF" => "ABCDEF" - # consider ["CDE", "DEF"] - # new samples: "GHI" => "ABCDEFGHI" - # consider ["EFG", "FGH", "GHI"] - ecg_rate = config.get_option("Rate") + # Disable streaming by default + self.ble_streaming = False + self.b.gatts_write(self.ecg_cccd_handle, "\x00\x00") - for [prev, cur, next_] in neighbours(3, history[-(num_new_samples + 2) :]): - samples_since_last_pulse += 1 + leds.dim_top(1) - if prev > cur < next_ and cur < q_threshold: - q_spike = samples_since_last_pulse - # we expect the next q-spike to be at least 60% as high as this one - q_threshold = (cur * 3) // 5 - elif ( - prev < cur > next_ - and cur > r_threshold - and samples_since_last_pulse - q_spike < ecg_rate // 10 - ): - # the full QRS complex is < 0.1s long, so the q and r spike in particular cannot be more than ecg_rate//10 samples apart - pulse = 60 * ecg_rate // samples_since_last_pulse - q_spike = -ecg_rate - if pulse < 30 or pulse > 210: - pulse = -1 - elif write > 0 and "pulse" in config.get_option("Log"): - write_pulse() - # we expect the next r-spike to be at least 60% as high as this one - r_threshold = (cur * 3) // 5 - samples_since_last_pulse = 0 - elif samples_since_last_pulse > 2 * ecg_rate: - q_threshold = -1 - r_threshold = 1 - pulse = -1 - - -def callback_ecg(datasets): - global update_screen, history, filebuffer, write, samples_since_start_of_write, ble_streaming, ecg_data_handle - - if ble_streaming: + def update_history(self, datasets): + self.last_sample_count = len(datasets) + for val in datasets: + if "HP" in config.get_option("Filter"): + self.history.append(val - self.moving_average) + self.moving_average += self.betadash * (val - self.moving_average) + # identical to: moving_average = (alpha * moving_average + beta * val) / (alpha + beta) + else: + self.history.append(val) + + # trim old elements + self.history = self.history[-HISTORY_MAX:] + + def detect_pulse(self, num_new_samples): + # look at 3 consecutive samples, starting 2 samples before the samples that were just added, e.g.: + # existing samples: "ABCD" + # new samples: "EF" => "ABCDEF" + # consider ["CDE", "DEF"] + # new samples: "GHI" => "ABCDEFGHI" + # consider ["EFG", "FGH", "GHI"] + ecg_rate = config.get_option("Rate") + + def neighbours(n, lst): + """ + neighbours(2, "ABCDE") = ("AB", "BC", "CD", "DE") + neighbours(3, "ABCDE") = ("ABC", "BCD", "CDE") + """ + + for i in range(len(lst) - (n - 1)): + yield lst[i : i + n] + + for [prev, cur, next_] in neighbours(3, self.history[-(num_new_samples + 2) :]): + self.samples_since_last_pulse += 1 + + if prev > cur < next_ and cur < self.q_threshold: + self.q_spike = self.samples_since_last_pulse + # we expect the next q-spike to be at least 60% as high as this one + self.q_threshold = (cur * 3) // 5 + elif ( + prev < cur > next_ + and cur > self.r_threshold + and self.samples_since_last_pulse - self.q_spike < ecg_rate // 10 + ): + # the full QRS complex is < 0.1s long, so the q and r spike in particular cannot be more than ecg_rate//10 samples apart + self.pulse = 60 * ecg_rate // self.samples_since_last_pulse + self.q_spike = -ecg_rate + if self.pulse < 30 or self.pulse > 210: + self.pulse = -1 + elif self.write > 0 and "pulse" in config.get_option("Log"): + self.write_pulse() + # we expect the next r-spike to be at least 60% as high as this one + self.r_threshold = (cur * 3) // 5 + self.samples_since_last_pulse = 0 + elif self.samples_since_last_pulse > 2 * ecg_rate: + self.q_threshold = -1 + self.r_threshold = 1 + self.pulse = -1 + + def callback_ecg(self, datasets): + if self.ble_streaming: + # try: + if 1: + self.b.gatts_notify( + 1, + self.ecg_data_handle, + struct.pack(">" + ("h" * len(datasets)), *datasets), + ) + # except: + pass + + # Don't update the screen if it should be off during a connection + if not config.get_option("BLE Disp"): + return + + self.update_screen += len(datasets) + if self.write > 0: + self.samples_since_start_of_write += len(datasets) + + # update graph datalist + if not self.pause_graph: + self.update_history(datasets) + self.detect_pulse(len(datasets)) + + # buffer for writes + if self.write > 0 and "graph" in config.get_option("Log"): + for value in datasets: + self.filebuffer.extend(struct.pack("h", value)) + if len(self.filebuffer) >= FILEBUFFERBLOCK: + self.write_filebuffer() + + # don't update on every callback + if self.update_screen >= DRAW_AFTER_SAMPLES: + self.draw_graph() + + def append_to_file(self, fileprefix, content): + # write to file + filename = "/ecg_logs/{}-{}.log".format(fileprefix, self.write_time_string) + + # write stuff to disk try: - b.gatts_notify( - 1, ecg_data_handle, struct.pack(">" + ("h" * len(datasets)), *datasets) - ) + f = open(filename, "ab") + f.write(content) + f.close() + except OSError as e: + print("Please check the file or filesystem", e) + self.write = 0 + self.pause_screen = -1 + self.disp.clear(COLOR_BACKGROUND) + self.disp.print("IO Error", posy=0, fg=COLOR_TEXT) + self.disp.print("Please check", posy=20, fg=COLOR_TEXT) + self.disp.print("your", posy=40, fg=COLOR_TEXT) + self.disp.print("filesystem", posy=60, fg=COLOR_TEXT) + self.disp.update() + self.close_sensor() except: - pass + print("Unexpected error, stop writing logfile") + self.write = 0 - # Don't update the screen if it should be off during a connection - if not config.get_option("BLE Disp"): - return - - update_screen += len(datasets) - if write > 0: - samples_since_start_of_write += len(datasets) - - # update graph datalist - if not pause_graph: - update_history(datasets) - detect_pulse(len(datasets)) - - # buffer for writes - if write > 0 and "graph" in config.get_option("Log"): - for value in datasets: - filebuffer.extend(struct.pack("h", value)) - if len(filebuffer) >= FILEBUFFERBLOCK: - write_filebuffer() - - # don't update on every callback - if update_screen >= DRAW_AFTER_SAMPLES: - draw_graph() - - -def append_to_file(fileprefix, content): - global write, pause_screen - # write to file - filename = "/ecg_logs/{}-{}.log".format(fileprefix, write_time_string) - - # write stuff to disk - try: - f = open(filename, "ab") - f.write(content) - f.close() - except OSError as e: - print("Please check the file or filesystem", e) - write = 0 - pause_screen = -1 - disp.clear(COLOR_BACKGROUND) - disp.print("IO Error", posy=0, fg=COLOR_TEXT) - disp.print("Please check", posy=20, fg=COLOR_TEXT) - disp.print("your", posy=40, fg=COLOR_TEXT) - disp.print("filesystem", posy=60, fg=COLOR_TEXT) - disp.update() - close_sensor() - except: - print("Unexpected error, stop writing logfile") - write = 0 - - -def write_pulse(): - # estimates timestamp as calls to time.time() take too much time - approx_timestamp = write + samples_since_start_of_write // config.get_option("Rate") - append_to_file("pulse", struct.pack("ib", approx_timestamp, pulse)) - - -def write_filebuffer(): - global filebuffer - append_to_file("ecg", filebuffer) - filebuffer = bytearray() + def write_pulse(self): + # estimates timestamp as calls to time.time() take too much time + approx_timestamp = ( + self.write + self.samples_since_start_of_write // config.get_option("Rate") + ) + self.append_to_file("pulse", struct.pack("ib", approx_timestamp, self.pulse)) + + def write_filebuffer(self): + self.append_to_file("ecg", self.filebuffer) + self.filebuffer = bytearray() + + def open_sensor(self): + self.sensor = max30001.MAX30001( + usb=(config.get_option("Mode") == "USB"), + bias=config.get_option("Bias"), + sample_rate=config.get_option("Rate"), + callback=self.callback_ecg, + ) + def close_sensor(self): + self.sensor.close() + + def toggle_write(self): + self.pause_screen = time.time_ms() + 1000 + self.disp.clear(COLOR_BACKGROUND) + if self.write > 0: + self.write_filebuffer() + self.write = 0 + self.disp.print("Stop", posx=50, posy=20, fg=COLOR_TEXT) + self.disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT) + else: + self.filebuffer = bytearray() + self.write = time.time() + lt = time.localtime(self.write) + self.write_time_string = "{:04d}-{:02d}-{:02d}_{:02d}{:02d}{:02d}".format( + *lt + ) + self.samples_since_start_of_write = 0 + try: + os.mkdir("ecg_logs") + except: + pass + self.disp.print("Start", posx=45, posy=20, fg=COLOR_TEXT) + self.disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT) + + self.disp.update() + + def toggle_pause(self): + if self.pause_graph: + self.pause_graph = False + self.history = [] + else: + self.pause_graph = True + self.graph_offset = 0 + leds.clear() -def open_sensor(): - global sensor - sensor = max30001.MAX30001( - usb=(config.get_option("Mode") == "USB"), - bias=config.get_option("Bias"), - sample_rate=config.get_option("Rate"), - callback=callback_ecg, - ) - - -def close_sensor(): - global sensor - sensor.close() - - -def toggle_write(): - global write, disp, pause_screen, filebuffer, samples_since_start_of_write, write_time_string - pause_screen = time.time_ms() + 1000 - disp.clear(COLOR_BACKGROUND) - if write > 0: - write_filebuffer() - write = 0 - disp.print("Stop", posx=50, posy=20, fg=COLOR_TEXT) - disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT) - else: - filebuffer = bytearray() - write = time.time() - lt = time.localtime(write) - write_time_string = "{:04d}-{:02d}-{:02d}_{:02d}{:02d}{:02d}".format(*lt) - samples_since_start_of_write = 0 - try: - os.mkdir("ecg_logs") - except: - pass - disp.print("Start", posx=45, posy=20, fg=COLOR_TEXT) - disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT) + def draw_leds(self, vmin, vmax): + # vmin should be in [0, -1] + # vmax should be in [0, 1] - disp.update() + led_mode = config.get_option("LEDs") + # stop blinking + if not bool(led_mode): + return -def toggle_pause(): - global pause_graph, graph_offset, history, leds - if pause_graph: - pause_graph = False - history = [] - else: - pause_graph = True - graph_offset = 0 - leds.clear() + # update led bar + if "bar" in led_mode: + for i in reversed(range(6)): + leds.prep_hsv( + 5 + i, COLORS[5 + i] if vmin <= 0 and i <= vmin * -6 else (0, 0, 0) + ) + for i in reversed(range(6)): + leds.prep_hsv( + i, COLORS[i] if vmax >= 0 and 5 - i <= vmax * 6 else (0, 0, 0) + ) + + # blink red on pulse + if ( + "pulse" in led_mode + and self.pulse > 0 + and self.samples_since_last_pulse < self.last_pulse_blink + ): + for i in range(4): + leds.prep(11 + i, (255, 0, 0)) + elif "pulse" in led_mode: + for i in range(4): + leds.prep(11 + i, (0, 0, 0)) + self.last_pulse_blink = self.samples_since_last_pulse + + leds.update() + + def draw_graph(self): + # skip rendering due to message beeing shown + if self.pause_screen == -1: + return + elif self.pause_screen > 0: + t = time.time_ms() + if t > self.pause_screen: + self.pause_screen = 0 + else: + return + self.disp.clear(COLOR_BACKGROUND) -def draw_leds(vmin, vmax): - # vmin should be in [0, -1] - # vmax should be in [0, 1] - global pulse, samples_since_last_pulse, last_pulse_blink + # offset in pause_graph mode + timeWindow = config.get_option("Window") + window_end = int(len(self.history) - self.graph_offset) + s_end = max(0, window_end) + s_start = max(0, s_end - WIDTH * timeWindow) - led_mode = config.get_option("LEDs") + # get max value and calc scale + value_max = max(abs(x) for x in self.history[s_start:s_end]) + scale = SCALE_FACTOR / (value_max if value_max > 0 else 1) - # stop blinking - if not bool(led_mode): - return + # draw graph + # values need to be inverted so high values are drawn with low pixel coordinates (at the top of the screen) + draw_points = (int(-x * scale + OFFSET_Y) for x in self.history[s_start:s_end]) - # update led bar - if "bar" in led_mode: - for i in reversed(range(6)): - leds.prep_hsv( - 5 + i, COLORS[5 + i] if vmin <= 0 and i <= vmin * -6 else (0, 0, 0) - ) - for i in reversed(range(6)): - leds.prep_hsv( - i, COLORS[i] if vmax >= 0 and 5 - i <= vmax * 6 else (0, 0, 0) + prev = next(draw_points) + for x, value in enumerate(draw_points): + self.disp.line( + x // timeWindow, prev, (x + 1) // timeWindow, value, col=COLOR_LINE ) - - # blink red on pulse - if ( - "pulse" in led_mode - and pulse > 0 - and samples_since_last_pulse < last_pulse_blink - ): - for i in range(4): - leds.prep(11 + i, (255, 0, 0)) - elif "pulse" in led_mode: - for i in range(4): - leds.prep(11 + i, (0, 0, 0)) - last_pulse_blink = samples_since_last_pulse - - leds.update() - - -def draw_graph(): - global disp, history, write, pause_screen, update_screen - - # skip rendering due to message beeing shown - if pause_screen == -1: - return - elif pause_screen > 0: - t = time.time_ms() - if t > pause_screen: - pause_screen = 0 - else: - return - - disp.clear(COLOR_BACKGROUND) - - # offset in pause_graph mode - timeWindow = config.get_option("Window") - window_end = int(len(history) - graph_offset) - s_end = max(0, window_end) - s_start = max(0, s_end - WIDTH * timeWindow) - - # get max value and calc scale - value_max = max(abs(x) for x in history[s_start:s_end]) - scale = SCALE_FACTOR / (value_max if value_max > 0 else 1) - - # draw graph - # values need to be inverted so high values are drawn with low pixel coordinates (at the top of the screen) - draw_points = (int(-x * scale + OFFSET_Y) for x in history[s_start:s_end]) - - prev = next(draw_points) - for x, value in enumerate(draw_points): - disp.line(x // timeWindow, prev, (x + 1) // timeWindow, value, col=COLOR_LINE) - prev = value - - # draw text: mode/bias/write - if pause_graph: - disp.print( - "Pause" - + ( - " -{:0.1f}s".format(graph_offset / config.get_option("Rate")) - if graph_offset > 0 - else "" - ), - posx=0, - posy=0, - fg=COLOR_TEXT, - ) - else: - led_range = last_sample_count if last_sample_count > 5 else 5 - draw_leds( - min(history[-led_range:]) / value_max, max(history[-led_range:]) / value_max - ) - if pulse < 0: - disp.print( - config.get_option("Mode") - + ("+Bias" if config.get_option("Bias") else ""), + prev = value + + # draw text: mode/bias/write + if self.pause_graph: + self.disp.print( + "Pause" + + ( + " -{:0.1f}s".format(self.graph_offset / config.get_option("Rate")) + if self.graph_offset > 0 + else "" + ), posx=0, posy=0, - fg=( - COLOR_MODE_FINGER - if config.get_option("Mode") == MODE_FINGER - else COLOR_MODE_USB - ), + fg=COLOR_TEXT, ) else: - disp.print( - "BPM: {}".format(pulse), - posx=0, - posy=0, - fg=( - COLOR_MODE_FINGER - if config.get_option("Mode") == MODE_FINGER - else COLOR_MODE_USB - ), + led_range = self.last_sample_count if self.last_sample_count > 5 else 5 + self.draw_leds( + min(self.history[-led_range:]) / value_max, + max(self.history[-led_range:]) / value_max, ) + if self.pulse < 0: + self.disp.print( + config.get_option("Mode") + + ("+Bias" if config.get_option("Bias") else ""), + posx=0, + posy=0, + fg=( + COLOR_MODE_FINGER + if config.get_option("Mode") == MODE_FINGER + else COLOR_MODE_USB + ), + ) + else: + self.disp.print( + "BPM: {}".format(self.pulse), + posx=0, + posy=0, + fg=( + COLOR_MODE_FINGER + if config.get_option("Mode") == MODE_FINGER + else COLOR_MODE_USB + ), + ) + + # announce writing ecg log + if self.write > 0: + t = time.time() + if self.write > 0 and t % 2 == 0: + self.disp.print( + "LOG", posx=0, posy=60, fg=COLOR_WRITE_FG, bg=COLOR_WRITE_BG + ) + + self.disp.update() + self.update_screen = 0 + + def main(self): + # show button layout + self.disp.clear(COLOR_BACKGROUND) + self.disp.print( + " BUTTONS ", posx=0, posy=0, fg=COLOR_TEXT, font=display.FONT20 + ) + self.disp.line(0, 20, 159, 20, col=COLOR_LINE) + self.disp.print( + " Pause >", posx=0, posy=28, fg=COLOR_MODE_FINGER, font=display.FONT16 + ) + self.disp.print( + " Settings >", posx=0, posy=44, fg=COLOR_MODE_USB, font=display.FONT16 + ) + self.disp.print( + "< WriteLog ", posx=0, posy=64, fg=COLOR_WRITE_BG, font=display.FONT16 + ) + self.disp.update() + time.sleep(3) - # announce writing ecg log - if write > 0: - t = time.time() - if write > 0 and t % 2 == 0: - disp.print("LOG", posx=0, posy=60, fg=COLOR_WRITE_FG, bg=COLOR_WRITE_BG) - - disp.update() - update_screen = 0 - - -def main(): - global pause_graph, graph_offset, pause_screen - - # show button layout - disp.clear(COLOR_BACKGROUND) - disp.print(" BUTTONS ", posx=0, posy=0, fg=COLOR_TEXT, font=display.FONT20) - disp.line(0, 20, 159, 20, col=COLOR_LINE) - disp.print( - " Pause >", posx=0, posy=28, fg=COLOR_MODE_FINGER, font=display.FONT16 - ) - disp.print( - " Settings >", posx=0, posy=44, fg=COLOR_MODE_USB, font=display.FONT16 - ) - disp.print( - "< WriteLog ", posx=0, posy=64, fg=COLOR_WRITE_BG, font=display.FONT16 - ) - disp.update() - time.sleep(3) - - # start ecg - open_sensor() - while True: - button_pressed = {"BOTTOM_LEFT": 0, "BOTTOM_RIGHT": 0, "TOP_RIGHT": 0} + # start ecg + self.open_sensor() while True: - v = buttons.read( - buttons.BOTTOM_LEFT | buttons.BOTTOM_RIGHT | buttons.TOP_RIGHT - ) - - # TOP RIGHT - # - # pause - - # down - if button_pressed["TOP_RIGHT"] == 0 and v & buttons.TOP_RIGHT != 0: - button_pressed["TOP_RIGHT"] = 1 - toggle_pause() - - # up - if button_pressed["TOP_RIGHT"] > 0 and v & buttons.TOP_RIGHT == 0: - button_pressed["TOP_RIGHT"] = 0 - - # BOTTOM LEFT - # - # on pause = shift view left - # else = toggle write - - # down - if button_pressed["BOTTOM_LEFT"] == 0 and v & buttons.BOTTOM_LEFT != 0: - button_pressed["BOTTOM_LEFT"] = 1 - if pause_graph: - l = len(history) - graph_offset += config.get_option("Rate") / 2 - if l - graph_offset < WIDTH * config.get_option("Window"): - graph_offset = l - WIDTH * config.get_option("Window") - else: - toggle_write() - - # up - if button_pressed["BOTTOM_LEFT"] > 0 and v & buttons.BOTTOM_LEFT == 0: - button_pressed["BOTTOM_LEFT"] = 0 - - # BOTTOM RIGHT - # - # on pause = shift view right - # else = show settings - - # down - if button_pressed["BOTTOM_RIGHT"] == 0 and v & buttons.BOTTOM_RIGHT != 0: - button_pressed["BOTTOM_RIGHT"] = 1 - if pause_graph: - graph_offset -= config.get_option("Rate") / 2 - graph_offset -= graph_offset % (config.get_option("Rate") / 2) - if graph_offset < 0: - graph_offset = 0 - else: - pause_screen = -1 # hide graph - leds.clear() # disable all LEDs - if ble_streaming and not config.get_option("BLE Disp"): - disp.backlight(20) - - config.run() # show config menu - - if ble_streaming and not config.get_option("BLE Disp"): - disp.backlight(0) - close_sensor() # reset sensor in case mode or bias was changed TODO do not close sensor otherwise? - open_sensor() - pause_screen = 0 # start plotting graph again - # returning from menu was by pressing the TOP_RIGHT button + button_pressed = {"BOTTOM_LEFT": 0, "BOTTOM_RIGHT": 0, "TOP_RIGHT": 0} + while True: + v = buttons.read( + buttons.BOTTOM_LEFT | buttons.BOTTOM_RIGHT | buttons.TOP_RIGHT + ) + + # TOP RIGHT + # + # pause + + # down + if button_pressed["TOP_RIGHT"] == 0 and v & buttons.TOP_RIGHT != 0: button_pressed["TOP_RIGHT"] = 1 - - # up - if button_pressed["BOTTOM_RIGHT"] > 0 and v & buttons.BOTTOM_RIGHT == 0: - button_pressed["BOTTOM_RIGHT"] = 0 + self.toggle_pause() + + # up + if button_pressed["TOP_RIGHT"] > 0 and v & buttons.TOP_RIGHT == 0: + button_pressed["TOP_RIGHT"] = 0 + + # BOTTOM LEFT + # + # on pause = shift view left + # else = toggle write + + # down + if button_pressed["BOTTOM_LEFT"] == 0 and v & buttons.BOTTOM_LEFT != 0: + button_pressed["BOTTOM_LEFT"] = 1 + if self.pause_graph: + l = len(self.history) + self.graph_offset += config.get_option("Rate") / 2 + if l - self.graph_offset < WIDTH * config.get_option("Window"): + self.graph_offset = l - WIDTH * config.get_option("Window") + else: + self.toggle_write() + + # up + if button_pressed["BOTTOM_LEFT"] > 0 and v & buttons.BOTTOM_LEFT == 0: + button_pressed["BOTTOM_LEFT"] = 0 + + # BOTTOM RIGHT + # + # on pause = shift view right + # else = show settings + + # down + if ( + button_pressed["BOTTOM_RIGHT"] == 0 + and v & buttons.BOTTOM_RIGHT != 0 + ): + button_pressed["BOTTOM_RIGHT"] = 1 + if self.pause_graph: + self.graph_offset -= config.get_option("Rate") / 2 + self.graph_offset -= self.graph_offset % ( + config.get_option("Rate") / 2 + ) + if self.graph_offset < 0: + self.graph_offset = 0 + else: + self.pause_screen = -1 # hide graph + leds.clear() # disable all LEDs + if self.ble_streaming and not config.get_option("BLE Disp"): + self.disp.backlight(20) + + config.run() # show config menu + + if self.ble_streaming and not config.get_option("BLE Disp"): + self.disp.backlight(0) + self.close_sensor() # reset sensor in case mode or bias was changed TODO do not close sensor otherwise? + self.open_sensor() + self.pause_screen = 0 # start plotting graph again + # returning from menu was by pressing the TOP_RIGHT button + button_pressed["TOP_RIGHT"] = 1 + + # up + if button_pressed["BOTTOM_RIGHT"] > 0 and v & buttons.BOTTOM_RIGHT == 0: + button_pressed["BOTTOM_RIGHT"] = 0 if __name__ == "__main__": + ecg = ECG() try: - main() + ecg.main() except KeyboardInterrupt as e: - sensor.close() + ecg.close_sensor() raise e -- GitLab