Skip to content
Snippets Groups Projects
Commit 4f4851ba authored by fuchsi*'s avatar fuchsi*
Browse files

Merge branch 'ecg-bpm-counter' into 'master'

style(preload): (ECG) Semantic variable names in pulse detection

See merge request !1
parents 45edc3eb 66497153
No related branches found
No related tags found
1 merge request!1style(preload): (ECG) Semantic variable names in pulse detection
Pipeline #3672 passed
......@@ -49,9 +49,9 @@ beta = 3
def update_history(datasets):
global history, moving_average, alpha, beta
for i in range(len(datasets)):
history.append(datasets[i] - moving_average)
moving_average = (alpha * moving_average + beta * datasets[i]) / (alpha + beta)
for val in datasets:
history.append(val - moving_average)
moving_average = (alpha * moving_average + beta * val) / (alpha + beta)
# trim old elements
history = history[-HISTORY_MAX:]
......@@ -65,25 +65,35 @@ r_threshold = 1
q_spike = -ECG_RATE
def neighbours(n, lst):
"""
neighbours(2, "ABCDE") = ("AB", "BC", "CD", "DE")
neighbours(3, "ABCDE") = ("ABC", "BCD", "CDE")
"""
for i in range(len(lst) - (n - 1)):
yield lst[i : i + n]
def detect_pulse(num_new_samples):
global history, pulse, samples_since_last_pulse, q_threshold, r_threshold, q_spike
for i in range(len(history) - num_new_samples - 1, len(history) - 1):
if i <= 0:
continue
# look at 3 consecutive samples, starting 2 samples before the samples that were just added, e.g.:
# existing samples: "ABCD"
# new samples: "EF" => "ABCDEF"
# consider ["CDE", "DEF"]
# new samples: "GHI" => "ABCDEFGHI"
# consider ["EFG", "FGH", "GHI"]
for [prev, cur, next_] in neighbours(3, history[-(num_new_samples + 2) :]):
samples_since_last_pulse += 1
if (
history[i - 1] > history[i]
and history[i + 1] > history[i]
and history[i] < q_threshold
):
if prev > cur < next_ and cur < q_threshold:
q_spike = samples_since_last_pulse
# we expect the next q-spike to be at least 60% as high as this one
q_threshold = (history[i] * 3) // 5
q_threshold = (cur * 3) // 5
elif (
history[i - 1] < history[i]
and history[i + 1] < history[i]
and history[i] > r_threshold
prev < cur > next_
and cur > r_threshold
and samples_since_last_pulse - q_spike < ECG_RATE // 10
):
# the full QRS complex is < 0.1s long, so the q and r spike in particular cannot be more than ECG_RATE//10 samples apart
......@@ -93,7 +103,7 @@ def detect_pulse(num_new_samples):
if pulse < 30 or pulse > 210:
pulse = -1
# we expect the next r-spike to be at least 60% as high as this one
r_threshold = (history[i] * 3) // 5
r_threshold = (cur * 3) // 5
elif samples_since_last_pulse > 2 * ECG_RATE:
q_threshold = -1
r_threshold = 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment