Skip to content
Snippets Groups Projects
Commit a7a2174f authored by schneider's avatar schneider
Browse files

ecg: Add demo application for BLE ECG streaming

parent 66a27534
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3
import bluepy
import time
import struct
import argparse
import numpy as np
import threading
from datetime import datetime
import matplotlib
matplotlib.use("GTK3Agg")
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from scipy import signal
# Set this to true to invert the polarity of the data
invert = True
# Default sample rate of the ECG app
sample_rate = 128
# Larger FFT length gives more BPM resoultion, but takes longer to settle
fft_len = 1024
# Low/High section of the FFT which is shown. In BPM.
fft_low_freq = 40
fft_high_freq = 200
# If you want to see 50/60 Hz line noise in the FFT
# fft_high_freq = 60*60
fft_low_bin = int(fft_low_freq / 60 / (sample_rate / fft_len))
fft_high_bin = int(fft_high_freq / 60 / (sample_rate / fft_len))
# 50 Hz notch filter
b, a = signal.iirnotch(50, 50, sample_rate)
ecg_data = []
# Create figure for plotting
fig, ax = plt.subplots(2, 2, sharey=False)
fig.suptitle("card10 BLE ECG Streaming")
fig.canvas.manager.set_window_title("card10 BLE ECG Streaming")
# This function is called periodically from FuncAnimation
def animate(i):
global ecg_data
# Limit displayed data
xs = np.array(range(len(ecg_data))[-fft_len:]) / sample_rate
ys = np.array(ecg_data[-fft_len:])
if len(ys) == 0:
return
if invert:
ys *= -1
ys_filt = signal.filtfilt(b, a, ys)
# Draw x and y lists
ax[0, 0].clear()
ax[0, 0].plot(xs, ys)
ax[1, 0].clear()
ax[1, 0].plot(xs, ys_filt)
if len(ys) == fft_len:
ax[0, 1].clear()
fft = abs(np.fft.fft(ys)[fft_low_bin:fft_high_bin])
ax[0, 1].plot(
np.linspace(fft_low_freq, fft_high_freq, fft_high_bin - fft_low_bin), fft
)
am = np.argmax(fft) + fft_low_bin
print("BPM:", am * sample_rate / fft_len * 60)
ax[1, 1].clear()
fft = abs(np.fft.fft(ys)[fft_low_bin:fft_high_bin])
ax[1, 1].plot(
np.linspace(fft_low_freq, fft_high_freq, fft_high_bin - fft_low_bin), fft
)
am = np.argmax(fft) + fft_low_bin
print("BPM(filt):", am * sample_rate / fft_len * 60)
# Format plot
# Needs to be here as the clear() call above also clears parts of the formating
ax[0, 0].set_title("ECG Data")
ax[0, 0].set_ylabel("Voltage")
ax[0, 0].set_xlabel("Time [s]")
ax[1, 0].set_title("ECG Data (50 Hz filtered)")
ax[1, 0].set_ylabel("Voltage")
ax[1, 0].set_xlabel("Time [s]")
ax[0, 1].set_title("FFT of ECG Data")
ax[0, 1].set_xlabel("Frequency [BPM]")
ax[1, 1].set_title("FFT of ECG Data (50 Hz filtered)")
ax[1, 1].set_xlabel("Frequency [BPM]")
def plot_thread():
# Set up plot to call animate() function periodically
ani = animation.FuncAnimation(fig, animate, interval=500)
plt.show()
def main() -> None:
parser = argparse.ArgumentParser(
description="""\
Transfer sensor data using Bluetooth Low Energy.
"""
)
parser.add_argument(
"mac", help="BT MAC address of the card10. Format: CA:4D:10:XX:XX:XX"
)
args = parser.parse_args()
t0 = time.time()
p = bluepy.btle.Peripheral(args.mac)
# We need a larger MTU than the default MTU
p.setMTU(90)
c = p.getCharacteristics(uuid="42230301-2342-2342-2342-234223422342")[0]
# Enable streaming
c.getDescriptors()[0].write(b"\x01\x00")
print("Connection setup time:", int(time.time() - t0), "seconds")
class ECGDelegate(bluepy.btle.DefaultDelegate):
def __init__(self):
bluepy.btle.DefaultDelegate.__init__(self)
self.t0 = time.time()
def handleNotification(self, cHandle, data):
global ecg_data
if cHandle == c.valHandle:
index, *volts = struct.unpack(
">H" + ("h" * ((len(data) // 2) - 1)), data
)
t = time.time()
dt = t - self.t0
self.t0 = t
print(f"{dt:.3f}", cHandle, index, volts)
ecg_data += volts
p.setDelegate(ECGDelegate())
pt = threading.Thread(target=plot_thread)
pt.daemon = True
pt.start()
while pt.is_alive():
# All the magic happens in the delegate and the plotting thread.
# We just spin in here and tell bluepy that we want to receive
# notification.
p.waitForNotifications(1.0)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment