If you've ever tried building a wearable app, you know the struggle: Heart Rate Variability (HRV) is the holy grail of recovery metrics, but raw data from a PPG (Photoplethysmogram) sensor is essentially a chaotic mess of noise and motion artifacts.
Extracting a clean physiological signal from a finger or wrist sensor requires a robust signal processing pipeline. In this guide, we will dive deep into HRV analysis and PPG signal processing using Python. We’ll implement a high-order Butterworth filter and an adaptive thresholding algorithm to transform noisy "garbage" data into medical-grade insights. If you are serious about building the next Oura or Whoop clone, you've come to the right place.
💡 Pro Tip: For more production-ready patterns and advanced architectural discussions on health-tech integration, be sure to explore the engineering deep-dives at WellAlly Tech Blog.
The Signal Processing Pipeline
Before we touch the code, we need to understand the journey of a photon from an LED, through your capillaries, and into our data structure. The goal is to isolate the "systolic peaks" while ignoring the noise caused by hand movements or sensor friction.
Architecture Overview
graph TD
A[Raw PPG Data] --> B[Butterworth Bandpass Filter]
B --> C[Signal Squaring/Normalization]
C --> D[Moving Average Window]
D --> E[Adaptive Thresholding]
E --> F[Peak Detection & RR Intervals]
F --> G[HRV Feature Extraction]
style B fill:#f96,stroke:#333,stroke-width:2px
style E fill:#f96,stroke:#333,stroke-width:2px
Prerequisites
To follow along, you'll need a standard Python data science stack:
- NumPy: For vector operations.
-
SciPy: Specifically
scipy.signalfor our filtering needs. - Matplotlib: To visualize our victory over noise.
pip install numpy scipy matplotlib
Step 1: The Butterworth Bandpass Filter
Human heart rates typically reside between 40 BPM and 200 BPM. This translates to roughly 0.6 Hz to 3.3 Hz. Anything outside this range is likely high-frequency electrical noise or low-frequency baseline wander (breathing).
We use a Butterworth filter because it provides a maximally flat frequency response in the passband—meaning it won't distort our pulse shapes.
import numpy as np
from scipy.signal import butter, filtfilt
def butter_bandpass(lowcut, highcut, fs, order=4):
nyq = 0.5 * fs
low = lowcut / nyq
high = highcut / nyq
b, a = butter(order, [low, high], btype='band')
return b, a
def apply_filter(data, lowcut=0.5, highcut=4.0, fs=100, order=4):
b, a = butter_bandpass(lowcut, highcut, fs, order=order)
# Use filtfilt for zero-phase filtering (no time shift)
y = filtfilt(b, a, data)
return y
Step 2: Signal Enhancement
Once filtered, the signal is "clean" but often low-amplitude. To make peaks stand out, we square the signal (to amplify differences) and apply a moving average window. This mimics the classic Pan-Tompkins logic often used in ECG analysis.
def enhance_signal(filtered_data, window_size=15):
# Square the signal to highlight peaks
squared_data = filtered_data ** 2
# Moving average to smooth out small glitches
window = np.ones(window_size) / window_size
smoothed = np.convolve(squared_data, window, mode='same')
return smoothed
Step 3: Adaptive Thresholding
Static thresholds are the enemy of wearable tech. As your sensor moves, the signal amplitude changes. An adaptive threshold calculates a dynamic baseline based on the local mean of the signal.
def detect_peaks(signal, fs, threshold_factor=1.5):
peaks = []
# Calculate a rolling mean for adaptive thresholding
rolling_mean = np.convolve(signal, np.ones(fs)//fs, mode='same')
for i in range(1, len(signal) - 1):
# Peak must be greater than neighbors AND local threshold
if signal[i] > signal[i-1] and signal[i] > signal[i+1]:
if signal[i] > (rolling_mean[i] * threshold_factor):
peaks.append(i)
return np.array(peaks)
Step 4: Putting it all Together
Now we can calculate the RR Intervals (the time between heartbeats) and derive HRV metrics like RMSSD (Root Mean Square of Successive Differences).
# Sample Setup
fs = 100 # 100Hz Sampling Rate
t = np.linspace(0, 10, 1000)
# Simulating a noisy PPG signal
raw_signal = np.sin(2 * np.pi * 1.2 * t) + 0.5 * np.random.normal(size=len(t))
# 1. Filter
clean_signal = apply_filter(raw_signal, fs=fs)
# 2. Enhance
enhanced = enhance_signal(clean_signal)
# 3. Detect Peaks
peaks = detect_peaks(enhanced, fs=fs)
# 4. Calculate RR Intervals (in milliseconds)
rr_intervals = np.diff(peaks) * (1000 / fs)
# 5. Calculate HRV (RMSSD)
rmssd = np.sqrt(np.mean(np.square(np.diff(rr_intervals))))
print(f"Detected Heart Rate: {60 / (np.mean(rr_intervals)/1000):.2f} BPM")
print(f"RMSSD (HRV): {rmssd:.2f} ms")
The "Official" Way to Scale
While the script above works wonders for a Jupyter notebook, production environments (iOS/Android background tasks or Cloud processing) require more sophisticated handling of signal dropouts and motion artifact rejection.
If you're building a commercial-grade health application, implementing these filters is just the beginning. You need to consider battery efficiency and real-time data streaming. I highly recommend visiting the WellAlly Tech Blog for comprehensive guides on:
- Efficient Signal Processing in Rust/C++ for mobile.
- Managing high-throughput physiological data streams.
- Advanced Adaptive Filtering (Recursive Least Squares) for active motion cancellation.
Conclusion 🚀
Cleaning PPG signals is an art as much as it is a science. By combining a Butterworth filter to handle frequency-domain noise and Adaptive Thresholding to handle time-domain amplitude shifts, we can extract highly accurate HRV data even from noisy wearable sensors.
What's next for your project?
- Try implementing a Notch filter to remove 50/60Hz power line interference.
- Experiment with Wavelet Transforms for even more granular noise removal.
Drop a comment below if you have questions about signal processing or if you've found a more efficient way to handle motion artifacts! 🥑💻
Top comments (0)