diff --git a/tools/card10-ble-ecg-streaming.py b/tools/card10-ble-ecg-streaming.py new file mode 100755 index 0000000000000000000000000000000000000000..e4ec000370a179f341ac49394eda5745e1516ef9 --- /dev/null +++ b/tools/card10-ble-ecg-streaming.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 + +import bluepy +import time +import struct +import argparse +import numpy as np +import threading + +from datetime import datetime +import matplotlib + +matplotlib.use("GTK3Agg") + +import matplotlib.pyplot as plt +import matplotlib.animation as animation + +from scipy import signal + +# Set this to true to invert the polarity of the data +invert = True + +# Default sample rate of the ECG app +sample_rate = 128 + +# Larger FFT length gives more BPM resoultion, but takes longer to settle +fft_len = 1024 + +# Low/High section of the FFT which is shown. In BPM. +fft_low_freq = 40 +fft_high_freq = 200 + +# If you want to see 50/60 Hz line noise in the FFT +# fft_high_freq = 60*60 + +fft_low_bin = int(fft_low_freq / 60 / (sample_rate / fft_len)) +fft_high_bin = int(fft_high_freq / 60 / (sample_rate / fft_len)) + +# 50 Hz notch filter +b, a = signal.iirnotch(50, 50, sample_rate) + +ecg_data = [] + +# Create figure for plotting +fig, ax = plt.subplots(2, 2, sharey=False) +fig.suptitle("card10 BLE ECG Streaming") +fig.canvas.manager.set_window_title("card10 BLE ECG Streaming") + +# This function is called periodically from FuncAnimation +def animate(i): + global ecg_data + + # Limit displayed data + xs = np.array(range(len(ecg_data))[-fft_len:]) / sample_rate + ys = np.array(ecg_data[-fft_len:]) + + if len(ys) == 0: + return + + if invert: + ys *= -1 + + ys_filt = signal.filtfilt(b, a, ys) + + # Draw x and y lists + ax[0, 0].clear() + ax[0, 0].plot(xs, ys) + + ax[1, 0].clear() + ax[1, 0].plot(xs, ys_filt) + + if len(ys) == fft_len: + ax[0, 1].clear() + fft = abs(np.fft.fft(ys)[fft_low_bin:fft_high_bin]) + ax[0, 1].plot( + np.linspace(fft_low_freq, fft_high_freq, fft_high_bin - fft_low_bin), fft + ) + + am = np.argmax(fft) + fft_low_bin + + print("BPM:", am * sample_rate / fft_len * 60) + + ax[1, 1].clear() + fft = abs(np.fft.fft(ys)[fft_low_bin:fft_high_bin]) + ax[1, 1].plot( + np.linspace(fft_low_freq, fft_high_freq, fft_high_bin - fft_low_bin), fft + ) + + am = np.argmax(fft) + fft_low_bin + + print("BPM(filt):", am * sample_rate / fft_len * 60) + + # Format plot + # Needs to be here as the clear() call above also clears parts of the formating + ax[0, 0].set_title("ECG Data") + ax[0, 0].set_ylabel("Voltage") + ax[0, 0].set_xlabel("Time [s]") + + ax[1, 0].set_title("ECG Data (50 Hz filtered)") + ax[1, 0].set_ylabel("Voltage") + ax[1, 0].set_xlabel("Time [s]") + + ax[0, 1].set_title("FFT of ECG Data") + ax[0, 1].set_xlabel("Frequency [BPM]") + + ax[1, 1].set_title("FFT of ECG Data (50 Hz filtered)") + ax[1, 1].set_xlabel("Frequency [BPM]") + + +def plot_thread(): + # Set up plot to call animate() function periodically + ani = animation.FuncAnimation(fig, animate, interval=500) + plt.show() + + +def main() -> None: + parser = argparse.ArgumentParser( + description="""\ +Transfer sensor data using Bluetooth Low Energy. +""" + ) + + parser.add_argument( + "mac", help="BT MAC address of the card10. Format: CA:4D:10:XX:XX:XX" + ) + + args = parser.parse_args() + + t0 = time.time() + p = bluepy.btle.Peripheral(args.mac) + + # We need a larger MTU than the default MTU + p.setMTU(90) + + c = p.getCharacteristics(uuid="42230301-2342-2342-2342-234223422342")[0] + + # Enable streaming + c.getDescriptors()[0].write(b"\x01\x00") + + print("Connection setup time:", int(time.time() - t0), "seconds") + + class ECGDelegate(bluepy.btle.DefaultDelegate): + def __init__(self): + bluepy.btle.DefaultDelegate.__init__(self) + self.t0 = time.time() + + def handleNotification(self, cHandle, data): + global ecg_data + + if cHandle == c.valHandle: + index, *volts = struct.unpack( + ">H" + ("h" * ((len(data) // 2) - 1)), data + ) + t = time.time() + + dt = t - self.t0 + self.t0 = t + + print(f"{dt:.3f}", cHandle, index, volts) + ecg_data += volts + + p.setDelegate(ECGDelegate()) + + pt = threading.Thread(target=plot_thread) + pt.daemon = True + pt.start() + + while pt.is_alive(): + # All the magic happens in the delegate and the plotting thread. + # We just spin in here and tell bluepy that we want to receive + # notification. + p.waitForNotifications(1.0) + + +if __name__ == "__main__": + main()