Skip to content
Snippets Groups Projects
Commit edcff13e authored by schneider's avatar schneider
Browse files

feat(spo2): Run Maxim RD117 algo in app

parent 43e05fc9
Branches
Tags
1 merge request!414SpO2 sensor improvements
......@@ -2,17 +2,21 @@ import max86150
import display
import utime
import buttons
import spo2_algo
import utime
import color
class SPO2:
def __init__(self):
self.sensor = None
self.RATE = 100
self.HISTORY_MAX = self.RATE * 4
self.history = []
self.HISTORY_MAX = self.RATE * 5
self.ir_history = []
self.red_history = []
self.update_screen = 0
self.disp = display.open()
self.DRAW_AFTER_SAMPLES = 5
self.DRAW_AFTER_SAMPLES = 100
self.histogram_offset = 0
self.WIDTH = 160
self.SCALE_FACTOR = 30
......@@ -22,9 +26,10 @@ class SPO2:
self.avg_pos = 0
self.last_sample = 0.0
self.filtered_value = 0.0
self.source = "Red"
self.average = 0
self.prev_w = 0
self.prev_w_ir = 0
self.prev_w_red = 0
self.t0 = utime.time_ms()
def open(self):
def callback(datasets):
......@@ -34,30 +39,65 @@ class SPO2:
# don't update on every callback
if self.update_screen >= self.DRAW_AFTER_SAMPLES:
self.draw_histogram()
self.disp.clear(self.COLOR_BACKGROUND)
self.draw_histogram(self.ir_history, color.RED)
self.draw_histogram(self.red_history, color.GREEN)
spo2, spo2_valid, hr, hr_valid = spo2_algo.maxim_rd117(self.ir_history, self.red_history)
print(utime.time_ms() - self.t0, spo2, spo2_valid, hr, hr_valid)
self.t0 = utime.time_ms()
if hr_valid:
self.disp.print('HR: {0:3} bpm'.format(hr), posy=20)
else:
self.disp.print('HR: --- bpm'.format(hr), posy=20)
if spo2_valid:
self.disp.print('SpO2: {0:3}%'.format(spo2), posy=0)
else:
self.disp.print('SpO2: ---%'.format(spo2), posy=0)
self.disp.update()
"""
if hr_valid and hr > x:
with open("ir.txt", "w") as ir:
for sample in self.ir_history:
ir.write("%d\n" % sample)
with open("red.txt", "w") as ir:
for sample in self.red_history:
ir.write("%d\n" % sample)
print("Wrote data")
while True: pass
"""
self.update_screen = 0
# 4x over sampling is active ATM
self.sensor = max86150.MAX86150(callback=callback, sample_rate=self.RATE * 4)
while True:
utime.sleep(.1)
if buttons.read(buttons.BOTTOM_RIGHT):
if self.source == "Red":
self.source = "IR"
else:
self.source = "Red"
pass
while buttons.read(buttons.BOTTOM_RIGHT): pass
def update_history(self, datasets):
alpha = 0.9975
for val in datasets:
print("%d,%d" % (val.red, val.infrared))
if self.source == "Red":
d = val.red
else:
d = val.infrared
#print("%d,%d" % (val.red, val.infrared))
self.ir_history.append(val.infrared)
self.red_history.append(val.red)
"""
w_ir = val.infrared + self.prev_w_ir * 0.95
self.ir_history.append(w_ir - self.prev_w_ir)
self.prev_w = w_ir
w_red = val.red + self.prev_w_red * 0.95
self.red_history.append(w - self.prev_w_red)
self.prev_w = w_red
"""
w = d + self.prev_w * 0.95
self.history.append(w - self.prev_w)
self.prev_w = w
"""
self.avg[self.avg_pos] = d
......@@ -72,44 +112,35 @@ class SPO2:
self.filtered_value + avg_data - self.last_sample
)
self.last_sample = avg_data
self.history.append(self.filtered_value)
"""
"""
self.average = alpha * self.average + (1 - alpha) * d
d -= self.average
self.history.append(d)
self.ir_history.append(self.filtered_value)
"""
# trim old elements
self.history = self.history[-self.HISTORY_MAX :]
def draw_histogram(self):
self.disp.clear(self.COLOR_BACKGROUND)
self.disp.print('Source: {0} '.format(self.source))
self.ir_history = self.ir_history[-self.HISTORY_MAX :]
self.red_history = self.red_history[-self.HISTORY_MAX :]
def draw_histogram(self, history, col):
# offset in pause_histogram mode
window_end = len(self.history) - self.histogram_offset
window_end = len(history) - self.histogram_offset
s_start = max(0, window_end - (self.RATE * 2))
s_end = max(0, window_end)
s_draw = max(0, s_end - self.WIDTH)
average = sum(history[s_start:s_end]) / (s_end - s_start)
# get max value and calc scale
value_max = max(abs(x) for x in self.history[s_start:s_end])
value_max = max(abs(x - average) for x in history[s_start:s_end])
scale = self.SCALE_FACTOR / (value_max if value_max > 0 else 1)
# draw histogram
draw_points = (
int(x * scale + self.OFFSET_Y) for x in self.history[s_draw:s_end]
int((x - average) * scale + self.OFFSET_Y) for x in history[s_draw:s_end]
)
prev = next(draw_points)
for x, value in enumerate(draw_points):
self.disp.line(x, prev, x + 1, value)
self.disp.line(x, prev, x + 1, value, col=col)
prev = value
self.disp.update()
self.update_screen = 0
def close(self):
if self.self is not None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment