一个完整周期的ECG信号有 QRS P T 波组成,不同的人相应不用的波形,同一个人在不同的阶段波形也不同。所以QRS检测是心电数据处理的基础。
https://github/c-labpl/qrs_detector
参考这个代码,用来检测MIH-BIH不成功,这里代码贴出来参考(修正了代码执行错误的点)。
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import os
import sys
import shutil
from time import gmtime, strftime
from scipy.signal import butter, lfilter
class QRSDetector(object):
"""
Python Offline ECG QRS Detector based on the Pan-Tomkins algorithm.
"""
def __init__(self, ecg_data_raw, verbose=True, plot_data=True, show_plot=True):
"""
QRSDetectorOffline class initialisation method.
:param numpy array ecg_data_raw: [time, value]
:param bool verbose: flag for printing the results
:param bool log_data: flag for logging the results
:param bool plot_data: flag for plotting the results to a file
:param bool show_plot: flag for showing generated results plot - will not show anything if plot is not generated
"""
# Configuration parameters
self.signal_frequency = 360 # Set ECG device frequency in samples per second here.
self.filter_lowcut = 0.001#lowcut cannot be zero
self.filter_highcut = 20.0
self.filter_order = 1
self.integration_window = 15 # Change proportionally when adjusting frequency (in samples).
self.findpeaks_limit = 0.35
self.findpeaks_spacing = 50 # Change proportionally when adjusting frequency (in samples).
self.refractory_period = 120 # Change proportionally when adjusting frequency (in samples).
self.qrs_peak_filtering_factor = 0.125
self.noise_peak_filtering_factor = 0.125
self.qrs_noise_diff_weight = 0.25
# Loaded ECG data.
self.ecg_data_raw = ecg_data_raw
# Measured and calculated values.
self.filtered_ecg_measurements = None
self.differentiated_ecg_measurements = None
self.squared_ecg_measurements = None
self.integrated_ecg_measurements = None
self.detected_peaks_indices = None
self.detected_peaks_values = None
self.qrs_peak_value = 0.0
self.noise_peak_value = 0.0
self.threshold_value = 0.0
# Detection results.
self.qrs_peaks_indices = np.array([], dtype=int)
self.noise_peaks_indices = np.array([], dtype=int)
# Final ECG data and QRS detection results array - samples with detected QRS are marked with 1 value.
self.ecg_data_detected = None
# Run whole detector flow.
self.detect_peaks()
self.detect_qrs()
if verbose:
self.print_detection_data()
if plot_data:
#PLOT_DIR = "plots/"
#self.plot_path = "{:s}QRS_offline_detector_plot_{:s}.png".format(PLOT_DIR,strftime("%Y_%m_%d_%H_%M_%S", gmtime()))
self.plot_detection_data(show_plot=show_plot)
"""ECG measurements data processing methods."""
def detect_peaks(self):
"""
Method responsible for extracting peaks from loaded ECG measurements data through measurements processing.
"""
# Extract measurements from loaded ECG data.
ecg_measurements = self.ecg_data_raw
# Measurements filtering - 0-15 Hz band pass filter.
self.filtered_ecg_measurements = self.bandpass_filter(ecg_measurements, lowcut=self.filter_lowcut,\
highcut=self.filter_highcut, signal_freq=self.signal_frequency,\
filter_order=self.filter_order)
self.filtered_ecg_measurements[:5] = self.filtered_ecg_measurements[5]
# Derivative - provides QRS slope information.
self.differentiated_ecg_measurements = np.ediff1d(self.filtered_ecg_measurements)
# Squaring - intensifies values received in derivative.
self.squared_ecg_measurements = self.differentiated_ecg_measurements ** 2
# Moving-window integration.
self.integrated_ecg_measurements = np.convolve(self.squared_ecg_measurements, np.ones(self.integration_window))
# Fiducial mark - peak detection on integrated measurements.
self.detected_peaks_indices = self.findpeaks(data=self.integrated_ecg_measurements,\
limit=self.findpeaks_limit,\
spacing=self.findpeaks_spacing)
self.detected_peaks_values = self.integrated_ecg_measurements[self.detected_peaks_indices]
"""QRS detection methods."""
def detect_qrs(self):
"""
Method responsible for classifying detected ECG measurements peaks either as noise or as QRS complex (heart beat).
"""
for detected_peak_index, detected_peaks_value in zip(self.detected_peaks_indices, self.detected_peaks_values):
try:
last_qrs_index = self.qrs_peaks_indices[-1]
except IndexError:
last_qrs_index = 0
# After a valid QRS complex detection, there is a 200 ms refractory period before next one can be detected.
if detected_peak_index - last_qrs_index > self.refractory_period or not self.qrs_peaks_indices.size:
# Peak must be classified either as a noise peak or a QRS peak.
# To be classified as a QRS peak it must exceed dynamically set threshold value.
if detected_peaks_value > self.threshold_value:
self.qrs_peaks_indices = np.append(self.qrs_peaks_indices, detected_peak_index)
# Adjust QRS peak value used later for setting QRS-noise threshold.
self.qrs_peak_value = self.qrs_peak_filtering_factor * detected_peaks_value + \
(1 - self.qrs_peak_filtering_factor) * self.qrs_peak_value
else:
self.noise_peaks_indices = np.append(self.noise_peaks_indices, detected_peak_index)
# Adjust noise peak value used later for setting QRS-noise threshold.
self.noise_peak_value = self.noise_peak_filtering_factor * detected_peaks_value + \
(1 - self.noise_peak_filtering_factor) * self.noise_peak_value
# Adjust QRS-noise threshold value based on previously detected QRS or noise peaks value.
self.threshold_value = self.noise_peak_value + \
self.qrs_noise_diff_weight * (self.qrs_peak_value - self.noise_peak_value)
# Create array containing both input ECG measurements data and QRS detection indication column.
# We mark QRS detection with '1' flag in 'qrs_detected' log column ('0' otherwise).
measurement_qrs_detection_flag = np.zeros([len(self.ecg_data_raw), 1])
measurement_qrs_detection_flag[self.qrs_peaks_indices] = 1
self.ecg_data_detected = np.append(self.ecg_data_raw, measurement_qrs_detection_flag, 1)
"""Results reporting methods."""
def print_detection_data(self):
"""
Method responsible for printing the results.
"""
print("qrs peaks indices")
print(self.qrs_peaks_indices)
print("noise peaks indices")
print(self.noise_peaks_indices)
def plot_detection_data(self, show_plot=True):
"""
Method responsible for plotting detection results.
:param bool show_plot: flag for plotting the results and showing plot
"""
def plot_data(axis, data, title='', fontsize=10):
axis.set_title(title, fontsize=fontsize)
axis.grid(which='both', axis='both', linestyle='--')
axis.plot(data, color="salmon", zorder=1)
def plot_points(axis, values, indices):
axis.scatter(x=indices, y=values[indices], c="black", s=50, zorder=2)
plt.close('all')
fig, axarr = plt.subplots(6, sharex=True, figsize=(15, 18))
plot_data(axis=axarr[0], data=self.ecg_data_raw, title='Raw ECG measurements')
plot_data(axis=axarr[1], data=self.filtered_ecg_measurements, title='Filtered ECG measurements')
plot_data(axis=axarr[2], data=self.differentiated_ecg_measurements, title='Differentiated ECG measurements')
plot_data(axis=axarr[3], data=self.squared_ecg_measurements, title='Squared ECG measurements')
plot_data(axis=axarr[4], data=self.integrated_ecg_measurements, title='Integrated ECG measurements with QRS peaks marked (black)')
plot_points(axis=axarr[4], values=self.integrated_ecg_measurements, indices=self.qrs_peaks_indices)
plot_data(axis=axarr[5], data=self.ecg_data_detected[:, 1], title='Raw ECG measurements with QRS peaks marked (black)')
plot_points(axis=axarr[5], values=self.ecg_data_detected[:, 1], indices=self.qrs_peaks_indices)
plt.tight_layout()
#fig.savefig(self.plot_path)
if show_plot:
plt.show()
plt.close()
"""Tools methods."""
def bandpass_filter(self, data, lowcut, highcut, signal_freq, filter_order):
"""
Method responsible for creating and applying Butterworth filter.
:param deque data: raw data
:param float lowcut: filter lowcut frequency value
:param float highcut: filter highcut frequency value
:param int signal_freq: signal frequency in samples per second (Hz)
:param int filter_order: filter order
:return array: filtered data
"""
nyquist_freq = 0.5 * signal_freq
low = lowcut / nyquist_freq
high = highcut / nyquist_freq
#https://github/scipy/scipy/blob/v0.19.1/scipy/signal/filter_design.py#L2226-L2297
b, a = butter(filter_order, [low, high], btype="band")
y = lfilter(b, a, data)
return y
def findpeaks(self, data, spacing=1, limit=None):
"""
Janko Slavic peak detection algorithm and implementation.
https://github/jankoslavic/py-tools/tree/master/findpeaks
Finds peaks in `data` which are of `spacing` width and >=`limit`.
:param ndarray data: data
:param float spacing: minimum spacing to the next peak (should be 1 or more)
:param float limit: peaks should have value greater or equal
:return array: detected peaks indexes array
"""
len = data.size
x = np.zeros(len + 2 * spacing)
x[:spacing] = data[0] - 1.e-6
x[-spacing:] = data[-1] - 1.e-6
x[spacing:spacing + len] = data
peak_candidate = np.zeros(len)
peak_candidate[:] = True
for s in range(spacing):
start = spacing - s - 1
h_b = x[start: start + len] # before
start = spacing
h_c = x[start: start + len] # central
start = spacing + s + 1
h_a = x[start: start + len] # after
peak_candidate = np.logical_and(peak_candidate, np.logical_and(h_c > h_b, h_c > h_a))
ind = np.argwhere(peak_candidate)
ind = ind.reshape(ind.size)
if limit is not None:
ind = ind[data[ind] > limit]
return ind
# QRS-P-T detection
file = os.path.join(rootdir,'100')
spl_len = 360*10#fs=360,times=10s
record = wfdb.rdrecord(file, sampfrom=0, sampto = spl_len, channel_names=['MLII', 'V1']) #fs=360,all 5 seconds
signal = record.p_signal[:,0].reshape(spl_len,1)#one channels
qrs_detector = QRSDetector(ecg_data_raw=signal)
ecg_data_raw是一维的值,主要self.filter_lowcut = 0.001不能设置为0。在MIT-BIH数据集上检测不成功,参数设置估计还需要研究,直接用https://github/MIT-LCP/wfdb-python/blob/master/tests/test_processing.py看看。
更多推荐
ECG-QRS检测参考代码
发布评论