diff --git a/tools/ble-ecg-sub-no-gui.py b/tools/ble-ecg-sub-no-gui.py new file mode 100755 index 0000000000000000000000000000000000000000..3f070de73f403c00d05cd4881312ecd8808d23ee --- /dev/null +++ b/tools/ble-ecg-sub-no-gui.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +import bluepy +import time +import binascii +import struct +import sys +import argparse +import os +import numpy as np +import threading + +def main() -> None: + parser = argparse.ArgumentParser( + description="""\ +Transfer sensor data using Bluetooth Low Energy. +""" + ) + + parser.add_argument( + "mac", help="BT MAC address of the card10. Format: CA:4D:10:XX:XX:XX" + ) + + args = parser.parse_args() + + t0 = time.time() + p = bluepy.btle.Peripheral(args.mac) + + #print(dir(p)) + + p.setMTU(90) + + c = p.getCharacteristics(uuid='42230301-2342-2342-2342-234223422342')[0] + c.getDescriptors()[0].write(b'\x01\x00') + #c.getDescriptors()[0].write(b'\x00\x00') + + print("Connection setup time:", int(time.time() - t0), "seconds") + + class MyDelegate(bluepy.btle.DefaultDelegate): + def __init__(self): + bluepy.btle.DefaultDelegate.__init__(self) + self.t0 = time.time() + + def handleNotification(self, cHandle, data): + #temp = struct.unpack("<H", data)[0] / 100 + + if cHandle == 514: + + #print(cHandle, data, len(data), len(data) / 2) + volts = struct.unpack(">" + ("h" * (len(data) // 2)), data) + t = time.time() + + dt = t-self.t0 + #print(f'{dt:.3f}', cHandle, temp) + #print(f'{dt:.3f}', cHandle, list(data)) + print(f'{dt:.3f}', cHandle, volts) + self.t0 = t + + p.setDelegate(MyDelegate()) + + while True: + p.waitForNotifications(10.0) + +if __name__ == "__main__": + main() diff --git a/tools/ble-ecg-sub.py b/tools/ble-ecg-sub.py new file mode 100755 index 0000000000000000000000000000000000000000..c7044f8903fa85a51bbba5288aa601f273cd377e --- /dev/null +++ b/tools/ble-ecg-sub.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 + +import bluepy +import time +import binascii +import struct +import sys +import argparse +import os +import numpy as np +import threading + +import datetime as dt +import matplotlib +matplotlib.use('GTK3Agg') + +import matplotlib.pyplot as plt +import matplotlib.animation as animation + +from scipy import signal + +# Create figure for plotting +#fig = plt.figure() +fig, ax = plt.subplots(2, 2, sharey=False) +print(ax[0]) +print(ax[1]) +xs = [] +ys = [] + +ecg = [] + +sample_rate = 128 +fft_len = 1024 // 4 +fft_low_freq = 40 +fft_high_freq = 60*60 + +fft_low_bin = int(fft_low_freq/60 / (sample_rate/fft_len)) +fft_high_bin = int(fft_high_freq/60 / (sample_rate/fft_len)) + + +b, a = signal.iirnotch(50, 50, sample_rate) + +# This function is called periodically from FuncAnimation +def animate(i, xs, ys): + global ecg + + # Read temperature (Celsius) from TMP102 + + # Add x and y to lists + xs.append(dt.datetime.now().strftime('%H:%M:%S.%f')) + #ys.append(temp_c) + + xs = range(len(ecg)) + ys = ecg[:] + + # Limit x and y lists to 20 items + xs = xs[-fft_len:] + ys = np.array(ys[-fft_len:]) * -1 + + + # Draw x and y lists + ax[0, 0].clear() + ax[0, 0].plot(xs, ys) + + if len(ys) == fft_len: + ax[0, 1].clear() + fft = abs(np.fft.fft(ys)[fft_low_bin:fft_high_bin]) + ax[0, 1].plot(np.linspace(fft_low_freq, fft_high_freq, fft_high_bin - fft_low_bin), fft) + + am = np.argmax(fft) + fft_low_bin + + print("BPM:", am * sample_rate/fft_len * 60) + + if len(ys) == 0: + return + ys = signal.filtfilt(b, a, ys) + + # Draw x and y lists + ax[1, 0].clear() + ax[1, 0].plot(xs, ys) + + if len(ys) == fft_len: + ax[1, 1].clear() + fft = abs(np.fft.fft(ys)[fft_low_bin:fft_high_bin]) + ax[1, 1].plot(np.linspace(fft_low_freq, fft_high_freq, fft_high_bin - fft_low_bin), fft) + + am = np.argmax(fft) + fft_low_bin + + print("BPM(filt):", am * sample_rate/fft_len * 60) + + + # Format plot + plt.xticks(rotation=45, ha='right') + plt.subplots_adjust(bottom=0.30) + plt.title('TMP102 Temperature over Time') + plt.ylabel('Temperature (deg C)') + +def plot_thread(): + # Set up plot to call animate() function periodically + ani = animation.FuncAnimation(fig, animate, fargs=(xs, ys), interval=500) + plt.show() + +threading.Thread(target=plot_thread).start() +#plot_thread() + + +def main() -> None: + parser = argparse.ArgumentParser( + description="""\ +Transfer sensor data using Bluetooth Low Energy. +""" + ) + + parser.add_argument( + "mac", help="BT MAC address of the card10. Format: CA:4D:10:XX:XX:XX" + ) + + args = parser.parse_args() + + t0 = time.time() + p = bluepy.btle.Peripheral(args.mac) + + #print(dir(p)) + + p.setMTU(90) + + c = p.getCharacteristics(uuid='42230301-2342-2342-2342-234223422342')[0] + c.getDescriptors()[0].write(b'\x01\x00') + #c.getDescriptors()[0].write(b'\x00\x00') + + print("Connection setup time:", int(time.time() - t0), "seconds") + + class MyDelegate(bluepy.btle.DefaultDelegate): + def __init__(self): + bluepy.btle.DefaultDelegate.__init__(self) + self.t0 = time.time() + + def handleNotification(self, cHandle, data): + global ecg + #temp = struct.unpack("<H", data)[0] / 100 + + if cHandle == 514: + + #print(cHandle, data, len(data), len(data) / 2) + volts = struct.unpack(">" + ("h" * (len(data) // 2)), data) + t = time.time() + + dt = t-self.t0 + #print(f'{dt:.3f}', cHandle, temp) + #print(f'{dt:.3f}', cHandle, list(data)) + print(f'{dt:.3f}', cHandle, volts) + self.t0 = t + ecg += volts + #plt.plot(volts) + #plt.pause(0.05) + + p.setDelegate(MyDelegate()) + + while True: + p.waitForNotifications(10.0) + +if __name__ == "__main__": + main()