Skip to content
Snippets Groups Projects
Commit a126e425 authored by schneider's avatar schneider
Browse files

Merge branch 'master' into 'master'

max86150 pycardium api

See merge request !359
parents da4bc11f d10d99fb
Branches
No related tags found
No related merge requests found
import max86150
import display
class SPO2:
def __init__(self):
self.sensor = None
self.RATE = 128
self.HISTORY_MAX = self.RATE * 4
self.history = []
self.update_screen = 0
self.disp = display.open()
self.DRAW_AFTER_SAMPLES = 5
self.histogram_offset = 0
self.WIDTH = 160
self.SCALE_FACTOR = 30
self.OFFSET_Y = 49
self.COLOR_BACKGROUND = [0, 0, 0]
self.avg = [0] * 10
self.avg_pos = 0
self.last_sample = 0.0
self.filtered_value = 0.0
def open(self):
def callback(datasets):
self.update_screen += len(datasets)
self.update_history(datasets)
# don't update on every callback
if self.update_screen >= self.DRAW_AFTER_SAMPLES:
self.draw_histogram()
self.sensor = max86150.MAX86150(callback)
while True:
pass
def update_history(self, datasets):
for val in datasets:
# get red value (first in tuple)
self.avg[self.avg_pos] = val.red
if self.avg_pos < 9:
self.avg_pos += 1
else:
self.avg_pos = 0
avg_data = sum(self.avg) / 10
# DC offset removal
self.filtered_value = 0.9 * (
self.filtered_value + avg_data - self.last_sample
)
self.last_sample = avg_data
self.history.append(self.filtered_value)
# trim old elements
self.history = self.history[-self.HISTORY_MAX :]
def draw_histogram(self):
self.disp.clear(self.COLOR_BACKGROUND)
# offset in pause_histogram mode
window_end = len(self.history) - self.histogram_offset
s_start = max(0, window_end - (self.RATE * 2))
s_end = max(0, window_end)
s_draw = max(0, s_end - self.WIDTH)
# get max value and calc scale
value_max = max(abs(x) for x in self.history[s_start:s_end])
scale = self.SCALE_FACTOR / (value_max if value_max > 0 else 1)
# draw histogram
draw_points = (
int(x * scale + self.OFFSET_Y) for x in self.history[s_draw:s_end]
)
prev = next(draw_points)
for x, value in enumerate(draw_points):
self.disp.line(x, prev, x + 1, value)
prev = value
self.disp.update()
self.update_screen = 0
def close(self):
if self.self is not None:
self.sensor.close()
self.sensor = None
if __name__ == "__main__":
sensor = SPO2()
try:
sensor.open()
except KeyboardInterrupt as e:
sensor.close()
raise e
{"name":"SPO2","description":"A simple oximetetry monitor.","category":"hardware","author":"card10 contributors","revision":-1,"source":"preload"}
......@@ -9,6 +9,7 @@ modsrc = files(
'modules/interrupt.c',
'modules/light_sensor.c',
'modules/max30001-sys.c',
'modules/max86150.c',
'modules/os.c',
'modules/personal_state.c',
'modules/power.c',
......
......@@ -97,6 +97,8 @@ static const mp_rom_map_elem_t interrupt_module_globals_table[] = {
MP_OBJ_NEW_SMALL_INT(EPIC_INT_BHI160_GYROSCOPE) },
{ MP_ROM_QSTR(MP_QSTR_MAX30001_ECG),
MP_OBJ_NEW_SMALL_INT(EPIC_INT_MAX30001_ECG) },
{ MP_ROM_QSTR(MP_QSTR_MAX86150),
MP_OBJ_NEW_SMALL_INT(EPIC_INT_MAX86150) },
};
static MP_DEFINE_CONST_DICT(
......
#include "py/obj.h"
#include "py/objlist.h"
#include "py/runtime.h"
#include "py/builtin.h"
#include "api/common.h"
#include "mphalport.h"
#include "epicardium.h"
STATIC mp_obj_t mp_max86150_enable_sensor(size_t n_args, const mp_obj_t *args)
{
struct max86150_sensor_config cfg = { 0 };
cfg.sample_buffer_len = mp_obj_get_int(args[0]);
cfg.ppg_sample_rate = mp_obj_get_int(args[1]);
int stream_id = epic_max86150_enable_sensor(&cfg, sizeof(cfg));
if (stream_id < 0) {
mp_raise_OSError(-stream_id);
}
return MP_OBJ_NEW_SMALL_INT(stream_id);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(
mp_max86150_enable_sensor_obj, 2, 2, mp_max86150_enable_sensor
);
STATIC mp_obj_t mp_max86150_read_sensor(mp_obj_t stream_id_in)
{
// do not use too big buf
// 256*12 is close to stack size of 4096, reduced to 64
struct max86150_sensor_data buf[64];
int stream_id = mp_obj_get_int(stream_id_in);
int n = epic_stream_read(stream_id, buf, sizeof(buf));
mp_obj_list_t *list = mp_obj_new_list(0, NULL);
for (int i = 0; i < n; i++) {
mp_obj_t tuple[3];
tuple[0] = mp_obj_new_int(buf[i].red);
tuple[1] = mp_obj_new_int(buf[i].ir);
tuple[2] = mp_obj_new_int(buf[i].ecg);
mp_obj_list_append(list, mp_obj_new_tuple(3, tuple));
}
return MP_OBJ_FROM_PTR(list);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(
mp_max86150_read_sensor_obj, mp_max86150_read_sensor
);
STATIC mp_obj_t mp_max86150_disable_sensor(void)
{
int ret = epic_max86150_disable_sensor();
if (ret != 0) {
mp_raise_OSError(-ret);
}
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_0(
mp_max86150_disable_sensor_obj, mp_max86150_disable_sensor
);
STATIC const mp_rom_map_elem_t max86150_module_globals_table[] = {
{ MP_ROM_QSTR(MP_QSTR___name__), MP_ROM_QSTR(MP_QSTR_sys_max86150) },
{ MP_ROM_QSTR(MP_QSTR_enable_sensor),
MP_ROM_PTR(&mp_max86150_enable_sensor_obj) },
{ MP_ROM_QSTR(MP_QSTR_read_sensor),
MP_ROM_PTR(&mp_max86150_read_sensor_obj) },
{ MP_ROM_QSTR(MP_QSTR_disable_sensor),
MP_ROM_PTR(&mp_max86150_disable_sensor_obj) },
};
STATIC MP_DEFINE_CONST_DICT(
max86150_module_globals, max86150_module_globals_table
);
const mp_obj_module_t max86150_module = {
.base = { &mp_type_module },
.globals = (mp_obj_dict_t *)&max86150_module_globals,
};
/* clang-format off */
/* Register the module to make it available in Python */
MP_REGISTER_MODULE(MP_QSTR_sys_max86150, max86150_module, MODULE_MAX86150_ENABLED);
import sys_max86150
import uerrno
import interrupt
import ucollections
Max86150Data = ucollections.namedtuple("Max86150Data", ["red", "infrared", "ecg"])
class MAX86150:
"""
The MAX86150 class provides a stram interface to the MAX86150 PPG and ECG.
.. code-block:: python
import MAX86150
m = max86150.MAX86150()
m.read()
m.close()
"""
def __init__(self, callback=None, sample_buffer_len=128, sample_rate=200):
"""
Initializes the MAX86150 (if it is not already running).
:param callback: If not None: A callback which is called with the data when ever new data is available
"""
self.active = False
self.stream_id = -uerrno.ENODEV
self.interrupt_id = interrupt.MAX86150
self._callback = callback
self.sample_rate = sample_rate
self.sample_buffer_len = sample_buffer_len
self.enable_sensor()
def enable_sensor(self):
"""
Enables the sensor. Automatically called by __init__.
"""
interrupt.disable_callback(self.interrupt_id)
interrupt.set_callback(self.interrupt_id, self._interrupt)
self.stream_id = sys_max86150.enable_sensor(
self.sample_buffer_len, self.sample_rate
)
self.active = True
if self._callback:
interrupt.enable_callback(self.interrupt_id)
def __enter__(self):
return self
def __exit__(self, _et, _ev, _t):
self.close()
def close(self):
"""
Close the currently open connection to the sensor.
"""
if self.active:
self.active = False
sys_max86150.disable_sensor()
interrupt.disable_callback(self.interrupt_id)
interrupt.set_callback(self.interrupt_id, None)
def read(self):
"""
Read as many samples (signed integer) as currently available.
"""
assert self.active, "Sensor is inactive"
result = []
for sample in sys_max86150.read_sensor(self.stream_id):
result.append(self._convert(sample))
return result
def _convert(self, sample):
return Max86150Data(sample[0], sample[1], sample[2])
def _interrupt(self, _):
if self.active:
data = self.read()
if self._callback:
self._callback(data)
......@@ -7,6 +7,7 @@ python_modules = files(
'ledfx.py',
'leds.py',
'max30001.py',
'max86150.py',
'pride.py',
'simple_menu.py',
......
......@@ -181,7 +181,9 @@ Q(CHAOS)
Q(COMMUNICATION)
Q(CAMP)
/* required for interrupts */
Q(MAX30001_ECG)
Q(MAX86150)
/* ws2812 */
Q(ws2812)
......
......@@ -49,6 +49,7 @@ int mp_hal_trng_read_int(void);
/* Modules */
#define MODULE_BHI160_ENABLED (1)
#define MODULE_MAX86150_ENABLED (1)
#define MODULE_BME680_ENABLED (1)
#define MODULE_BUTTONS_ENABLED (1)
#define MODULE_DISPLAY_ENABLED (1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment