|
| 1 | +# main.py |
| 2 | +# Some ports need to import 'sleep' from 'time' module |
| 3 | +from machine import sleep, SoftI2C, Pin |
| 4 | +from utime import ticks_diff, ticks_us, ticks_ms |
| 5 | + |
| 6 | +from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM |
| 7 | + |
| 8 | + |
| 9 | +class HeartRateMonitor: |
| 10 | + """A simple heart rate monitor that uses a moving window to smooth the signal and find peaks.""" |
| 11 | + |
| 12 | + def __init__(self, sample_rate=100, window_size=10, smoothing_window=5): |
| 13 | + self.sample_rate = sample_rate |
| 14 | + self.window_size = window_size |
| 15 | + self.smoothing_window = smoothing_window |
| 16 | + self.samples = [] |
| 17 | + self.timestamps = [] |
| 18 | + self.filtered_samples = [] |
| 19 | + |
| 20 | + def add_sample(self, sample): |
| 21 | + """Add a new sample to the monitor.""" |
| 22 | + timestamp = ticks_ms() |
| 23 | + self.samples.append(sample) |
| 24 | + self.timestamps.append(timestamp) |
| 25 | + |
| 26 | + # Apply smoothing |
| 27 | + if len(self.samples) >= self.smoothing_window: |
| 28 | + smoothed_sample = ( |
| 29 | + sum(self.samples[-self.smoothing_window :]) / self.smoothing_window |
| 30 | + ) |
| 31 | + self.filtered_samples.append(smoothed_sample) |
| 32 | + else: |
| 33 | + self.filtered_samples.append(sample) |
| 34 | + |
| 35 | + # Maintain the size of samples and timestamps |
| 36 | + if len(self.samples) > self.window_size: |
| 37 | + self.samples.pop(0) |
| 38 | + self.timestamps.pop(0) |
| 39 | + self.filtered_samples.pop(0) |
| 40 | + |
| 41 | + def find_peaks(self): |
| 42 | + """Find peaks in the filtered samples.""" |
| 43 | + peaks = [] |
| 44 | + |
| 45 | + if len(self.filtered_samples) < 3: # Need at least three samples to find a peak |
| 46 | + return peaks |
| 47 | + |
| 48 | + # Calculate dynamic threshold based on the min and max of the recent window of filtered samples |
| 49 | + recent_samples = self.filtered_samples[-self.window_size :] |
| 50 | + min_val = min(recent_samples) |
| 51 | + max_val = max(recent_samples) |
| 52 | + threshold = ( |
| 53 | + min_val + (max_val - min_val) * 0.5 |
| 54 | + ) # 50% between min and max as a threshold |
| 55 | + |
| 56 | + for i in range(1, len(self.filtered_samples) - 1): |
| 57 | + if ( |
| 58 | + self.filtered_samples[i] > threshold |
| 59 | + and self.filtered_samples[i - 1] < self.filtered_samples[i] |
| 60 | + and self.filtered_samples[i] > self.filtered_samples[i + 1] |
| 61 | + ): |
| 62 | + peak_time = self.timestamps[i] |
| 63 | + peaks.append((peak_time, self.filtered_samples[i])) |
| 64 | + |
| 65 | + return peaks |
| 66 | + |
| 67 | + def calculate_heart_rate(self): |
| 68 | + """Calculate the heart rate in beats per minute (BPM).""" |
| 69 | + peaks = self.find_peaks() |
| 70 | + |
| 71 | + if len(peaks) < 2: |
| 72 | + return None # Not enough peaks to calculate heart rate |
| 73 | + |
| 74 | + # Calculate the average interval between peaks in milliseconds |
| 75 | + intervals = [] |
| 76 | + for i in range(1, len(peaks)): |
| 77 | + interval = ticks_diff(peaks[i][0], peaks[i - 1][0]) |
| 78 | + intervals.append(interval) |
| 79 | + |
| 80 | + average_interval = sum(intervals) / len(intervals) |
| 81 | + |
| 82 | + # Convert intervals to heart rate in beats per minute (BPM) |
| 83 | + heart_rate = ( |
| 84 | + 60000 / average_interval |
| 85 | + ) # 60 seconds per minute * 1000 ms per second |
| 86 | + |
| 87 | + return heart_rate |
| 88 | + |
| 89 | + |
| 90 | +def main(): |
| 91 | + # I2C software instance |
| 92 | + i2c = SoftI2C( |
| 93 | + sda=Pin(8), # Here, use your I2C SDA pin |
| 94 | + scl=Pin(9), # Here, use your I2C SCL pin |
| 95 | + freq=400000, |
| 96 | + ) # Fast: 400kHz, slow: 100kHz |
| 97 | + |
| 98 | + # Examples of working I2C configurations: |
| 99 | + # Board | SDA pin | SCL pin |
| 100 | + # ------------------------------------------ |
| 101 | + # ESP32 D1 Mini | 22 | 21 |
| 102 | + # TinyPico ESP32 | 21 | 22 |
| 103 | + # Raspberry Pi Pico | 16 | 17 |
| 104 | + # TinyS3 | 8 | 9 |
| 105 | + |
| 106 | + # Sensor instance |
| 107 | + sensor = MAX30102(i2c=i2c) # An I2C instance is required |
| 108 | + |
| 109 | + # Scan I2C bus to ensure that the sensor is connected |
| 110 | + if sensor.i2c_address not in i2c.scan(): |
| 111 | + print("Sensor not found.") |
| 112 | + return |
| 113 | + elif not (sensor.check_part_id()): |
| 114 | + # Check that the targeted sensor is compatible |
| 115 | + print("I2C device ID not corresponding to MAX30102 or MAX30105.") |
| 116 | + return |
| 117 | + else: |
| 118 | + print("Sensor connected and recognized.") |
| 119 | + |
| 120 | + # Load the default configuration |
| 121 | + print("Setting up sensor with default configuration.", "\n") |
| 122 | + sensor.setup_sensor() |
| 123 | + |
| 124 | + # Set the sample rate to 400: 400 samples/s are collected by the sensor |
| 125 | + sensor_sample_rate = 400 |
| 126 | + sensor.set_sample_rate(sensor_sample_rate) |
| 127 | + |
| 128 | + # Set the number of samples to be averaged per each reading |
| 129 | + sensor_fifo_average = 8 |
| 130 | + sensor.set_fifo_average(sensor_fifo_average) |
| 131 | + |
| 132 | + # Set LED brightness to a medium value |
| 133 | + sensor.set_active_leds_amplitude(MAX30105_PULSE_AMP_MEDIUM) |
| 134 | + |
| 135 | + # Expected acquisition rate: 400 Hz / 8 = 50 Hz |
| 136 | + actual_acquisition_rate = int(sensor_sample_rate / sensor_fifo_average) |
| 137 | + |
| 138 | + sleep(1) |
| 139 | + |
| 140 | + print( |
| 141 | + "Starting data acquisition from RED & IR registers...", |
| 142 | + "press Ctrl+C to stop.", |
| 143 | + "\n", |
| 144 | + ) |
| 145 | + sleep(1) |
| 146 | + |
| 147 | + # Initialize the heart rate monitor |
| 148 | + hr_monitor = HeartRateMonitor( |
| 149 | + # Select a sample rate that matches the sensor's acquisition rate |
| 150 | + sample_rate=actual_acquisition_rate, |
| 151 | + # Select a significant window size to calculate the heart rate (2-5 seconds) |
| 152 | + window_size=int(actual_acquisition_rate * 3), |
| 153 | + ) |
| 154 | + |
| 155 | + # Setup to calculate the heart rate every 2 seconds |
| 156 | + hr_compute_interval = 2 # seconds |
| 157 | + ref_time = ticks_ms() # Reference time |
| 158 | + |
| 159 | + while True: |
| 160 | + # The check() method has to be continuously polled, to check if |
| 161 | + # there are new readings into the sensor's FIFO queue. When new |
| 162 | + # readings are available, this function will put them into the storage. |
| 163 | + sensor.check() |
| 164 | + |
| 165 | + # Check if the storage contains available samples |
| 166 | + if sensor.available(): |
| 167 | + # Access the storage FIFO and gather the readings (integers) |
| 168 | + red_reading = sensor.pop_red_from_storage() |
| 169 | + ir_reading = sensor.pop_ir_from_storage() |
| 170 | + |
| 171 | + # Add the IR reading to the heart rate monitor |
| 172 | + # Note: based on the skin color, the red, IR or green LED can be used |
| 173 | + # to calculate the heart rate with more accuracy. |
| 174 | + hr_monitor.add_sample(ir_reading) |
| 175 | + |
| 176 | + # Periodically calculate the heart rate every `hr_compute_interval` seconds |
| 177 | + if ticks_diff(ticks_ms(), ref_time) / 1000 > hr_compute_interval: |
| 178 | + # Calculate the heart rate |
| 179 | + heart_rate = hr_monitor.calculate_heart_rate() |
| 180 | + if heart_rate is not None: |
| 181 | + print("Heart Rate: {:.0f} BPM".format(heart_rate)) |
| 182 | + else: |
| 183 | + print("Not enough data to calculate heart rate") |
| 184 | + # Reset the reference time |
| 185 | + ref_time = ticks_ms() |
| 186 | + |
| 187 | + |
| 188 | +if __name__ == "__main__": |
| 189 | + main() |
0 commit comments