Source code for pyPPG.fiducials

import pyPPG

import copy
import pandas as pd
import numpy as np
from dotmap import DotMap
from scipy.signal import kaiserord, firwin, filtfilt, detrend, periodogram, lfilter, find_peaks, firls, resample
from scipy import signal

[docs] class FpCollection: ########################################################################### ###################### Initialization of Fiducial Points ################## ########################################################################### def __init__(self, s: pyPPG.PPG): """ The purpose of the FiducialPoints class is to calculate the fiducial points. :param s: object of PPG signal :type s: pyPPG.PPG object """ keys=s.__dict__.keys() keys_list = list(keys) for i in keys_list: exec('self.'+i+' = s.'+i) ########################################################################### ############################ Get Fiducial Points ########################## ###########################################################################
[docs] def get_fiducials(self, s: pyPPG.PPG): '''This function calculates the PPG Fiducial Points. - Original signal: List of systolic peak, pulse onset, dicrotic notch, and diastolic peak - 1st derivative: List of points of 1st maximum and minimum in 1st derivitive between the onset to onset intervals (u,v) - 2nd derivative: List of maximum and minimum points in 2nd derivitive between the onset to onset intervals (a, b, c, d, e) :param s: object of PPG signal :type s: pyPPG.PPG object :return: fiducial points: DataFrame where the key is the name of the fiducial pints and the value is the list of fiducial points ''' # 'ABD' refers the original Aboy peak detectorm and 'PPGdet' refers the improved version. peak_detector='PPGdet' # Extract Fiducial Points ppg_fp=pd.DataFrame() peaks, onsets = self.get_peak_onset(peak_detector) dicroticnotch = self.get_dicrotic_notch(peaks, onsets) vpg_fp = self.get_vpg_fiducials(onsets) apg_fp = self.get_apg_fiducials(onsets, peaks) jpg_fp = self.get_jpg_fiducials(onsets, apg_fp) diastolicpeak = self.get_diastolic_peak(onsets, dicroticnotch, apg_fp.e) # Merge Fiducial Points keys=('on', 'sp', 'dn','dp') dummy = np.empty(len(peaks)) dummy.fill(np.NaN) n=0 for temp_val in (onsets,peaks,dicroticnotch,diastolicpeak): ppg_fp[keys[n]] = dummy ppg_fp[keys[n]][0:len(temp_val)]=temp_val n=n+1 fiducials=pd.DataFrame() for temp_sig in (ppg_fp,vpg_fp,apg_fp,jpg_fp): for key in list(temp_sig.keys()): fiducials[key] = dummy temp_val = temp_sig[key].values fiducials[key][0:len(temp_val)]=temp_val # Correct Fiducial Points fiducials=self.correct_fiducials(fiducials, s.correction) fiducials=fiducials.astype('Int64') # Extract pulse offsets offsets = copy.deepcopy(fiducials.on[1:]) offsets.index = offsets.index - 1 # Update index name fiducials = fiducials.drop(len(fiducials)-1) fiducials = fiducials.rename_axis('Index of pulse') # Add pulse offsets fiducials.insert(4, 'off', offsets) return fiducials
########################################################################### ############################ PPG beat detector ############################ ########################################################################### def get_peak_onset(self, peak_detector='PPGdet'): '''PPGdet detects beats in a photoplethysmogram (PPG) signal using the improved 'Automatic Beat Detection' of Aboy M et al. :param peak_detector: type of peak detector :type peak_detector: str :return: - peaks, 1-d array: indices of detected systolic peaks - onsets, 1-d array: indices of detected pulse onsets Reference --------- Aboy M et al., An automatic beat detection algorithm for pressure signals. IEEE Trans Biomed Eng 2005; 52: 1662 - 70. <https://doi.org/10.1109/TBME.2005.855725> Author: Marton A. Goda – Faculty of Biomedical Engineering, Technion – Israel Institute of Technology, Haifa, Israel (August 2022) Original Matlab implementation: Peter H. Charlton – King's College London (August 2017) – University of Cambridge (February 2022) <https://github.com/peterhcharlton/ppg-beats> Changes from Charlton's implementation: 1) Detect Maxima: * Systolic peak-to-peak distance is predicted by the heart rate estimate over the preceding 10 sec window. * The peak location is estimated by distances and prominences of the previous peaks. 2) Find Onsets: * The onset is a local minimum, which is always calculated from the peak that follows it within a given time window 3) Tidy of Peaks and Onsets: * There is a one-to-one correspondence between onsets and peaks * There are only onset and peak pairs * The distance between the onset and peak pairs can't be smaller than 30 ms 4) Correct Peaks and Onsets: * The Peaks must be the highest amplitude between two consecutive pulse onsets, if not, then these are corrected * After the correction of Peaks, the Onsets are recalculated ''' # inputs x = copy.deepcopy(self.ppg) #signal fso=self.fs fs = 75 x = resample(x, int(len(self.ppg)*(fs/fso))) up = self.set_beat_detection() #settings win_sec=10 w = fs * win_sec #window length(number of samples) win_starts = np.array(list(range(0,len(x),round(0.8*w)))) win_starts = win_starts[0:min(np.where([win_starts >= len(x) - w])[1])] win_starts = np.insert(win_starts,len(win_starts), len(x) + 1 - w) # before pre-processing hr_win=0 #the estimated systolic peak-to-peak distance, initially it is 0 hr_win_v=[] px = self.detect_maxima(x, 0, hr_win, peak_detector) # detect all maxima if len(px)==0: peaks = [] onsets = [] return peaks, onsets # detect peaks in windows all_p4 = [] all_hr = np.empty(len(win_starts)-1) all_hr [:] = np.NaN hr_past = 0 # the actual heart rate hrvi = 0 # heart rate variability index for win_no in range(0,len(win_starts) - 1): curr_els = range(win_starts[win_no],win_starts[win_no] + w) curr_x = x[curr_els] y1 = self.def_bandpass(curr_x, fs, 0.9 * up.fl_hz, 3 * up.fh_hz) # Filter no.1 hr = self.estimate_HR(y1, fs, up, hr_past) # Estimate HR from weakly filtered signal hr_past=hr all_hr[win_no] = hr if (peak_detector=='PPGdet') and (hr>40): if win_no==0: p1 = self.detect_maxima(y1, 0, hr_win, peak_detector) tr = np.percentile(np.diff(p1), 50) pks_diff = np.diff(p1) pks_diff = pks_diff[pks_diff>=tr] hrvi = np.std(pks_diff) / np.mean(pks_diff) * 5 hr_win = fs / ((1 + hrvi) * 3) hr_win_v.append(hr_win) else: hr_win=0 y2 = self.def_bandpass(curr_x, fs, 0.9 * up.fl_hz, 2.5 * hr / 60) # Filter no. 2 y2_deriv = self.estimate_deriv(y2) # Estimate derivative from highly filtered signal p2 = self.detect_maxima(y2_deriv, up.deriv_threshold,hr_win, peak_detector) # Detect maxima in derivative y3 = self.def_bandpass(curr_x, fs, 0.9 * up.fl_hz, 10 * hr / 60) p3 = self.detect_maxima(y3, 50, hr_win, peak_detector) # Detect maxima in moderately filtered signal p4 = self.find_pulse_peaks(p2, p3) p4 = np.unique(p4) if peak_detector=='PPGdet': if len(p4)>round(win_sec/2): pks_diff = np.diff(p4) tr = np.percentile(pks_diff, 30) pks_diff = pks_diff[pks_diff >= tr] med_hr=np.median(all_hr[np.where(all_hr>0)]) if ((med_hr*0.5<np.mean(pks_diff)) and (med_hr*1.5<np.mean(pks_diff))): hrvi = np.std(pks_diff) / np.mean(pks_diff)*10 all_p4 = np.concatenate((all_p4, win_starts[win_no] + p4), axis=None) all_p4=all_p4.astype(int) all_p4 = np.unique(all_p4) peaks, fn = self.correct_IBI(all_p4, px, np.median(all_hr), fs, up) peaks = (all_p4/fs*fso).astype(int) onsets, peaks = self.find_onsets(self.ppg, fso, up, peaks,60/np.median(all_hr)*fs) # Correct Peaks for i in range(0, len(peaks) - 1): max_loc = np.argmax(self.ppg[onsets[i]:onsets[i + 1]]) + onsets[i] if peaks[i] != max_loc: peaks[i] = max_loc # Correct onsets onsets, peaks = self.find_onsets(self.ppg, fso, up, peaks, 60 / np.median(all_hr) * fs) temp_i = np.where(np.diff(onsets) == 0)[0] if len(temp_i) > 0: peaks = np.delete(peaks, temp_i) onsets = np.delete(onsets, temp_i) temp_i = np.where((peaks - onsets) < fso / 30)[0] if len(temp_i) > 0: peaks = np.delete(peaks, temp_i) onsets = np.delete(onsets, temp_i) return peaks, onsets ########################################################################### ############################# Maximum detector ############################ ########################################################################### def detect_maxima(self, sig: np.array, percentile: int ,hr_win: int, peak_detector: str): #Table VI pseudocode """ Detect Maxima function detects all peaks in the raw and also in the filtered signal to find. :param sig: array of signal with shape (N,) where N is the length of the signal :type sig: 1-d array :param percentile: in each signal partition, a rank filter detects the peaks above a given percentile :type percentile: int :param hr_win: window for adaptive the heart rate estimate :type hr_win: int :param peak_detector: type of peak detector :type peak_detector: str :return: maximum peaks of signal, 1-d array. """ tr = np.percentile(sig, percentile) if peak_detector=='ABD': s1,s2,s3 = sig[2:], sig[1:-1],sig[0:-2] m = 1 + np.array(np.where((s1 < s2) & (s3 < s2))) max_pks = m[sig[m] > tr] if peak_detector=='PPGdet': s1,s2,s3 = sig[2:], sig[1:-1],sig[0:-2] max_loc = [] min_loc = [] max_pks=[] intensity_v = [] if hr_win == 0: m = 1 + np.array(np.where((s1 < s2) & (s3 < s2))) max_pks = m[sig[m] > tr] else: max_loc = find_peaks(sig, distance=hr_win)[0] min_loc = find_peaks(-sig, distance=hr_win)[0] for i in range(0,len(max_loc)): values = abs(max_loc[i] - min_loc) min_v = min(values) min_i = np.where(min_v==values)[0][0] intensity_v.append(sig[max_loc[i]] - sig[min_loc[min_i]]) # possible improvements: # - using adaptive thresholding for the maximum # - estimate probability density of the maximum tr2 = np.mean(intensity_v)*0.25 max_pks = find_peaks(sig+min(sig),prominence=tr2,distance=hr_win)[0] return max_pks ########################################################################### ############################ Bandpass filtering ########################### ########################################################################### def def_bandpass(self, sig: np.array, fs: int, lower_cutoff: float, upper_cutoff: float): """ def_bandpass filter function detects all peaks in the raw and also in the filtered signal to find. :param sig: array of signal with shape (N,) where N is the length of the signal :type sig: 1-d array :param fs: sampling frequency :type fs: int :param lower_cutoff: lower cutoff frequency :type lower_cutoff: float :param upper_cutoff: upper cutoff frequency :type upper_cutoff: float :return: bandpass filtered signal, 1-d array. """ # Filter characteristics: Eliminate VLFs (below resp freqs): For 4bpm cutoff up = DotMap() up.paramSet.elim_vlf.Fpass = 1.3*lower_cutoff #in Hz up.paramSet.elim_vlf.Fstop = 0.8*lower_cutoff #in Hz up.paramSet.elim_vlf.Dpass = 0.05 up.paramSet.elim_vlf.Dstop = 0.01 # Filter characteristics: Eliminate VHFs (above frequency content of signals) up.paramSet.elim_vhf.Fpass = 1.2*upper_cutoff #in Hz up.paramSet.elim_vhf.Fstop = 0.8*upper_cutoff #in Hz up.paramSet.elim_vhf.Dpass = 0.05 up.paramSet.elim_vhf.Dstop = 0.03 # perform BPF s = DotMap() s.v = sig s.fs = fs b, a = signal.iirfilter(5, [2 * np.pi * lower_cutoff, 2 * np.pi * upper_cutoff], rs=60, btype='band', analog=True, ftype='cheby2') bpf_sig = filtfilt(b, 1, s.v) return bpf_sig ########################################################################### ################### Filter the high frequency components ################# ########################################################################### def elim_vlfs(self, s: np.array, up: DotMap, lower_cutoff: float): """ This function filter the high frequency components. :param s: array of signal with shape (N,) where N is the length of the signal :type s: 1-d array :param up: setup up parameters of the algorithm :type up: DotMap :param lower_cutoff: lower cutoff frequency :type lower_cutoff: float :return: high frequency filtered signal, 1-d array. """ ## Filter pre-processed signal to remove frequencies below resp # Adapted from RRest ## Eliminate nans s.v[np.isnan(s.v)] = np.mean(s.v[~np.isnan(s.v)]) ##Make filter fc=lower_cutoff ripple=-20*np.log10(up.paramSet.elim_vlf.Dstop) width=abs(up.paramSet.elim_vlf.Fpass-up.paramSet.elim_vlf.Fstop)/(s.fs/2) [N,beta] = kaiserord(ripple,width) if N * 3 > len(s): N = round(N / 3) b = firwin(N, fc * 2 / s.fs, window=('kaiser', beta), scale=('True')) AMfilter = b s_filt=DotMap() try: s_filt.v = filtfilt(AMfilter, 1, s.v) s_filt.v = s.v-s_filt.v except: s_filt.v = s.v s_filt.fs = s.fs return s_filt ########################################################################### ################### Filter the low frequency components ################## ########################################################################### def elim_vhfs(self, s: np.array, up: DotMap, upper_cutoff: float): """ This function filter the high frequency components. :param s: array of signal with shape (N,) where N is the length of the signal :type s: 1-d array :param up: setup up parameters of the algorithm :type up: DotMap :param upper_cutoff: lower cutoff frequency :type upper_cutoff: float :return: low frequency filtered signal, 1-d array. """ ## Filter signal to remove VHFs # Adapted from RRest s_filt = DotMap() ##Eliminate nans s.v[np.isnan(s.v)] = np.mean(s.v[~np.isnan(s.v)]) ##Check to see if sampling freq is at least twice the freq of interest if (up.paramSet.elim_vhf.Fpass/(s.fs/2)) >= 1: s_filt.v = s.v return fc = upper_cutoff ripple = -20 * np.log10(up.paramSet.elim_vhf.Dstop) width = abs(up.paramSet.elim_vhf.Fpass - up.paramSet.elim_vhf.Fstop) / (s.fs / 2) [N, beta] = kaiserord(ripple, width) if N * 3 > len(s): N = round(N / 3) b = firwin(N, fc * 2 / s.fs, window=('kaiser', beta), scale=('True')) AMfilter = b ## Remove VHFs s_dt=detrend(s.v) s_filt.v = filtfilt(AMfilter, 1, s_dt) return s_filt ########################################################################### ########################### Heart Rate estimation ######################### ########################################################################### def estimate_HR(self, sig: np.array, fs: int, up: DotMap, hr_past: int): """ Heart Rate Estimation function estimate the heart rate according to the previous heart rate in given time window :param sig: array of signal with shape (N,) where N is the length of the signal :type sig: 1-d array :type fs: int :param up: setup up parameters of the algorithm :type up: DotMap :param hr_past: the average heart rate in the past in given time window :type hr_past: int :return: estimated heart rate, 1-d array. """ # Estimate PSD blackman_window = np.blackman(len(sig)) f, pxx = periodogram(sig,fs, blackman_window) ph = pxx fh = f # Extract HR if (hr_past / 60 < up.fl_hz) | (hr_past / 60 > up.fh_hz): rel_els = np.where((fh >= up.fl_hz) & (fh <= up.fh_hz)) else: rel_els = np.where((fh >= hr_past / 60 * 0.5) & (fh <= hr_past / 60 * 1.4)) rel_p = ph[rel_els] rel_f = fh[rel_els] max_el = np.where(rel_p==max(rel_p)) hr = rel_f[max_el]*60 hr = int(hr[0]) return hr ########################################################################### ############# Estimate derivative from highly filtered signal ############# ########################################################################### def estimate_deriv(self, sig: np.array): """ Derivative Estimation function estimate derivative from highly filtered signal based on the General least-squares smoothing and differentiation by the convolution (Savitzky Golay) method :param sig: array of signal with shape (N,) where N is the length of the signal :type sig: 1-d array :return: derivative, 1-d array. """ #Savitzky Golay deriv_no = 1 win_size = 5 deriv = self.savitzky_golay(sig, deriv_no, win_size) return deriv def savitzky_golay(self, sig: np.array, deriv_no: int, win_size: int): """ This function estimate the Savitzky Golay derivative from highly filtered signal :param sig: array of signal with shape (N,) where N is the length of the signal :type sig: 1-d array :param deriv_no: number of derivative :type deriv_no: int :param win_size: size of window :type win_size: int :return: Savitzky Golay derivative, 1-d array. """ ##assign coefficients # From: https: // en.wikipedia.org / wiki / Savitzky % E2 % 80 % 93 Golay_filter # Tables_of_selected_convolution_coefficients # which are calculated from: A., Gorry(1990). "General least-squares smoothing and differentiation by the convolution (Savitzky?Golay) method".Analytical Chemistry. 62(6): 570?3. doi: 10.1021 / ac00205a007. if deriv_no==0: #smoothing if win_size == 5: coeffs = [-3, 12, 17, 12, -3] norm_factor = 35 elif win_size == 7: coeffs = [-2, 3, 6, 7, 6, 3, -2] norm_factor = 21 elif win_size == 9: coeffs = [-21, 14, 39, 54, 59, 54, 39, 14, -21] norm_factor = 231 else: print('Can''t do this window size') elif deriv_no==1: # first derivative if win_size == 5: coeffs = range(-2,3) norm_factor = 10 elif win_size == 7: coeffs = range(-3,4) norm_factor = 28 elif win_size == 9: coeffs = range(-4,5) norm_factor = 60 else: print('Can''t do this window size') elif deriv_no == 2: # second derivative if win_size == 5: coeffs = [2, -1, -2, -1, 2] norm_factor = 7 elif win_size == 7: coeffs = [5, 0, -3, -4, -3, 0, 5] norm_factor = 42 elif win_size == 9: coeffs = [28, 7, -8, -17, -20, -17, -8, 7, 28] norm_factor = 462 else: print('Can''t do this window size') elif deriv_no == 3: # third derivative if win_size == 5: coeffs = [-1, 2, 0, -2, 1] norm_factor = 2 elif win_size == 7: coeffs = [-1, 1, 1, 0, -1, -1, 1] norm_factor = 6 elif win_size == 9: coeffs = [-14, 7, 13, 9, 0, -9, -13, -7, 14] norm_factor = 198 else: print('Can''t do this window size') elif deriv_no == 4: # fourth derivative if win_size == 7: coeffs = [3, -7, 1, 6, 1, -7, 3] norm_factor = 11 elif win_size == 9: coeffs = [14, -21, -11, 9, 18, 9, -11, -21, 14] norm_factor = 143 else: print('Can''t do this window size') else: print('Can''t do this order of derivative') if deriv_no % 2 == 1: coeffs = -np.array(coeffs) A = [1, 0] filtered_sig = lfilter(coeffs, A, sig) # filtered_sig = filtfilt(coeffs, A, sig) s = len(sig) half_win_size = np.floor(win_size * 0.5) zero_pad=filtered_sig[win_size] * np.ones(int(half_win_size)) sig_in=filtered_sig[win_size-1:s] sig_end=filtered_sig[s-1] * np.ones(int(half_win_size)) deriv = [*zero_pad,*sig_in,*sig_end] deriv = deriv / np.array(norm_factor) return deriv ########################################################################### ############################# Pulse detection ############################# ########################################################################### def find_pulse_peaks(self, p2: np.array, p3: np.array): """ Pulse detection function detect the pulse peaks according to the peaks of 1st and 2nd derivatives General least-squares smoothing and differentiation by the convolution (Savitzky Golay) method :param p2: peaks of the 1st derivatives :type p2: 1-d array :param p3: peaks of the 2nd derivatives :type p2: 1-d array :return: pulse peaks, 1-d array. """ p4 = np.empty(len(p2)) p4[:] = np.NaN for k in range(0,len(p2)): rel_el = np.where(p3>p2[k]) if np.any(rel_el) and ~np.isnan(rel_el[0][0]): p4[k] = p3[rel_el[0][0]] p4 = p4[np.where(~np.isnan(p4))] p4 = p4.astype(int) return p4 ########################################################################### ####################### Correct peaks' location error ##################### ########################################################################### def correct_IBI(self, p: np.array, m: np.array, hr: float, fs: int, up: DotMap): """ This function corrects the peaks' location (interbeat intervals) error :param p: systolic peaks of the PPG signal :type p: 1-d array :param m: all maxima of the PPG signal :type m: 1-d array :param hr: heart rate :type hr: float :param fs: sampling frequency :type fs: int :param up: setup up parameters of the algorithm :type up: DotMap :return: onsets, 1-d array. """ #Correct peaks' location error due to pre-processing pc = np.empty(len(p)) pc[:] = np.NaN pc1=[] for k in range(0,len(p)): temp_pk=abs(m - p[k]) rel_el = np.where(temp_pk==min(temp_pk)) pc1=[*pc1,*m[rel_el]] # Correct false positives # identify FPs d = np.diff(pc1)/fs # interbeat intervals in secs fp = self.find_reduced_IBIs(d, hr, up) # remove FPs pc = np.array(pc1)[fp] # Correct false negatives d = np.diff(pc)/fs # interbeat intervals in secs fn = self.find_prolonged_IBIs(d, hr, up) pc = pc1 return pc, fn def find_reduced_IBIs(self,IBIs: np.array, med_hr: float, up: DotMap): """ This function finds the reduced interbeat intervals :param IBIs: interbeat intervals in secs :type IBIs: 1-d array :param med_hr: median heart rate :type med_hr: float :param up: setup up parameters of the algorithm :type up: DotMap :return: fp, the false positive case """ IBI_thresh = up.lower_hr_thresh_prop*60/med_hr fp = IBIs < IBI_thresh fp = [*np.where(fp == 0)[0].astype(int)] return fp def find_prolonged_IBIs(self, IBIs: np.array, med_hr: float, up: DotMap): """ This function finds the prolonged interbeat intervals :param IBIs: interbeat intervals in secs :type IBIs: 1-d array :param med_hr: median heart rate :type med_hr: float :param up: setup up parameters of the algorithm :type up: DotMap :return: fn, the false negative case """ IBI_thresh = up.upper_hr_thresh_prop*60/med_hr fn = IBIs > IBI_thresh return fn ########################################################################### ####################### Setup up the beat detector ######################## ########################################################################### def set_beat_detection(self): """ This function setups the filter parameters of the algorithm :return: filter parameters of the algorithm, DotMap """ # plausible HR limits up=DotMap() up.fl = 30 #lower bound for HR up.fh = 200 #upper bound for HR up.fl_hz = up.fl/60 up.fh_hz = up.fh/60 # Thresholds up.deriv_threshold = 75 #originally 90 up.upper_hr_thresh_prop = 2.25 #originally 1.75 up.lower_hr_thresh_prop = 0.5 #originally 0.75 # Other parameters up.win_size = 10 #in secs return up ########################################################################### ############################## Find PPG onsets ############################ ########################################################################### def find_onsets(self,sig: np.array, fs: int, up: DotMap, peaks: np.array, med_hr: float): """ This function finds the onsets of PPG sigal :param sig: array of signal with shape (N,) where N is the length of the signal :type sig: 1-d array :param fs: sampling frequency :type fs: int :param up: setup up parameters of the algorithm :type up: DotMap :param peaks: peaks of the signal :type peaks: 1-d array :param med_hr: median heart rate :type med_hr: float :return: onsets, 1-d array """ Y1=self.def_bandpass(sig, fs, 0.9*up.fl_hz, 3*up.fh_hz) temp_oi0=find_peaks(-Y1,distance=med_hr*0.3)[0] null_indexes = np.where(temp_oi0<peaks[0]) if len(null_indexes[0])!=0: if len(null_indexes[0])==1: onsets = [null_indexes[0][0]] else: onsets = [null_indexes[0][-1]] else: onsets = [peaks[0]-round(fs/50)] i=1 while i < len(peaks): min_SUT=fs*0.12 # minimum Systolic Upslope Time 120 ms min_DT=fs*0.3 # minimum Diastolic Time 300 ms before_peak=temp_oi0 <peaks[i] after_last_onset=temp_oi0 > onsets[i - 1] SUT_time=peaks[i]-temp_oi0>min_SUT DT_time = temp_oi0-peaks[i-1] > min_DT temp_oi1 = temp_oi0[np.where(before_peak * after_last_onset*SUT_time*DT_time)] if len(temp_oi1)>0: if len(temp_oi1) == 1: onsets.append(temp_oi1[0]) else: onsets.append(temp_oi1[-1]) i=i+1 else: peaks = np.delete(peaks, i) return onsets,peaks ########################################################################### ########################## Detect dicrotic notch ########################## ########################################################################### def get_dicrotic_notch (self, peaks: np.array, onsets: list): """ Dicrotic Notch function estimate the location of dicrotic notch in between the systolic and diastolic peak :param peaks: peaks of the signal :type peaks: 1-d array :param onsets: onsets of the signal :type onsets: list :return: location of dicrotic notches, 1-d array """ ## The 2nd derivative and Hamming low pass filter is calculated. dxx = np.diff(np.diff(self.ppg)) fs = self.fs # Make filter Fn = fs / 2 # Nyquist Frequency FcU = 20 # Cut off Frequency: 20 Hz FcD = FcU + 5 # Transition Frequency: 5 Hz n = 21 # Filter order f = [0, (FcU / Fn), (FcD / Fn), 1] # Frequency band edges a = [1, 1, 0, 0] # Amplitudes b = firls(n, f, a) lp_ppg = filtfilt(b, 1, dxx) # Low pass filtered signal with 20 cut off Frequency and 5 Hz Transition width ## The weighting is calculated and applied to each beat individually def t_wmax(i, peaks,onsets): if i < 3: HR = np.mean(np.diff(peaks))/fs t_wmax = -0.1 * HR + 0.45 else: t_wmax = np.mean(peaks[i - 3:i]-onsets[i - 3:i])/fs return t_wmax dic_not=[] for i in range(0,len(onsets)-1): nth_beat = lp_ppg[onsets[i]:onsets[i + 1]] i_Pmax=peaks[i]-onsets[i] t_Pmax=(peaks[i]-onsets[i])/fs t=np.linspace(0,len(nth_beat)-1,len(nth_beat))/fs T_beat=(len(nth_beat)-1)/fs tau=(t-t_Pmax)/(T_beat-t_Pmax) tau[0:i_Pmax] = 0 beta=5 if len(peaks)>1: t_w=t_wmax(i, peaks, onsets) else: t_w=np.NaN if t_w!=T_beat: tau_wmax=(t_w-t_Pmax)/(T_beat-t_Pmax) else: tau_wmax=0.9 alfa=(beta*tau_wmax-2*tau_wmax+1)/(1-tau_wmax) if (alfa > 4.5) or (alfa < 1.5): HR = np.mean(np.diff(peaks))/fs t_w = -0.1 * HR + 0.45 tau_wmax = (t_w - t_Pmax) / (T_beat - t_Pmax) alfa = (beta * tau_wmax - 2 * tau_wmax + 1) / (1 - tau_wmax) ## Calculate the Dicrotic Notch for each heart cycle using the weighted window if alfa>1: w = tau ** (alfa - 1) * (1 - tau) ** (beta - 1) else: w = tau * (1 - tau) ** (beta - 1) pp=w*nth_beat pp = pp[np.where(~np.isnan(pp))] max_pp_v = np.max(pp) max_pp_i=np.where(pp==max_pp_v)[0][0] ## NOTE!! Shifting with 26 ms. FIX IT! shift=int(self.fs*0.026) dic_not.append(max_pp_i+onsets[i]+shift) return dic_not ########################################################################### ########################## Detect diastolic peak ########################## ########################################################################### def get_diastolic_peak(self, onsets: list, dicroticnotch: list, e_point: pd.Series): """ Dicrotic Notch function estimate the location of dicrotic notch in between the systolic and diastolic peak :param onsets: onsets of the signal :type onsets: list :param dicroticnotches: dicrotic notches of the signal :type dicroticnotches: list :param e_point: e-points of the signal :type e_point: pd.Series :return: diastolicpeak location of dicrotic notches, 1-d array """ nan_v = np.empty(len(dicroticnotch)) nan_v[:] = np.NaN diastolicpeak = nan_v for i in range(0,len(dicroticnotch)): try: len_segments=np.diff(onsets)*0.80 end_segment=int(onsets[i]+len_segments[i]) try: start_segment = int(dicroticnotch[i]) temp_segment = self.ppg[start_segment:end_segment] max_locs, _ = find_peaks(temp_segment) if len(max_locs)==0: start_segment = int(e_point[i]) temp_segment = self.vpg[start_segment:end_segment] max_locs, _ = find_peaks(temp_segment) except: pass max_dn = max_locs[0] + start_segment diastolicpeak[i] = max_dn except: pass return diastolicpeak ########################################################################### ####################### Get First Derivitive Points ####################### ########################################################################### def get_vpg_fiducials(self, onsets: list): """Calculate first derivitive points u and v from the PPG' signal :param onsets: onsets of the signal :type onsets: list :return: - u: The highest amplitude between the pulse onset and systolic peak on PPG' - v: The lowest amplitude between the u-point and diastolic peak on PPG' - w: The first local maximum or inflection point after the dicrotic notch on PPG’ """ dx = self.vpg nan_v = np.empty(len(onsets)-1) nan_v[:] = np.NaN u, v, w = copy.deepcopy(nan_v),copy.deepcopy(nan_v),copy.deepcopy(nan_v), for i in range(0,len(onsets)-1): try: segment = dx[onsets[i]:onsets[i + 1]] # u fiducial point max_loc = np.argmax(segment)+onsets[i] u[i]=max_loc # v fiducial point upper_bound_coeff = 0.66 v_upper_bound = ((onsets[i + 1] - onsets[i]) * upper_bound_coeff + onsets[i]).astype(int) min_loc = np.argmin(dx[int(u[i]):v_upper_bound])+u[i]-1 v[i] = min_loc # w fiducial point temp_segment=dx[int(v[i]):onsets[i+1]] max_locs, _ = find_peaks(temp_segment) max_w = max_locs[0] + v[i] - 1 w[i] = max_w except: pass vpg_fp = pd.DataFrame({"u":[], "v":[], "w":[]}) vpg_fp.u, vpg_fp.v, vpg_fp.w = u, v, w return vpg_fp ########################################################################### ####################### Get Second Derivitive Points ###################### ########################################################################### def get_apg_fiducials(self, onsets: list, peaks: np.array): """Calculate Second derivitive points a, b, c, d, e, and f from the PPG" signal :param onsets: onsets of the signal :type onsets: list :param peaks: peaks of the signal :param types: 1-d array :return: - a: The highest amplitude between pulse onset and systolic peak on PPG" - b: The first local minimum after the a-point on PPG" - c: The local maximum with the highest amplitude between the b-point and e-point, or if no local maximum is present then the inflection point on PPG" - d: The local minimum with the lowest amplitude between the c-point and e-point, or if no local minimum is present then the inflection point on PPG" - e: The local maximum with the highest amplitude after the b-point and before the diastolic peak on PPG" - f: The first local minimum after the e-point on PPG" """ sig = self.ppg ddx = self.apg dddx = self.jpg nan_v = np.empty(len(onsets)-1) nan_v[:] = np.NaN a, b, c, d, e, f = copy.deepcopy(nan_v),copy.deepcopy(nan_v),copy.deepcopy(nan_v),copy.deepcopy(nan_v),copy.deepcopy(nan_v),copy.deepcopy(nan_v) for i in range(0,len(onsets)-1): try: # a fiducial point temp_pk=np.argmax(sig[onsets[i]:onsets[i + 1]])+onsets[i]-1 temp_segment=ddx[onsets[i]:temp_pk] max_locs, _ = find_peaks(temp_segment) try: max_loc = max_locs[np.argmax(temp_segment[max_locs])] except: max_loc = temp_segment.argmax() max_a = max_loc + onsets[i] - 1 a[i] = max_a # b fiducial point temp_segment=ddx[int(a[i]):onsets[i+1]] min_locs, _ = find_peaks(-temp_segment) min_b = min_locs[0] + a[i] - 1 b[i] = min_b # e fiducial point e_lower_bound = peaks[i] upper_bound_coeff = 0.6 e_upper_bound = ((onsets[i + 1] - onsets[i]) * upper_bound_coeff + onsets[i]).astype(int) temp_segment=ddx[int(e_lower_bound):int(e_upper_bound)] max_locs, _ = find_peaks(temp_segment) if max_locs.size==0: temp_segment=ddx[int(peaks[i]):onsets[i + 1]] max_locs, _ = find_peaks(temp_segment) try: max_loc = max_locs[np.argmax(temp_segment[max_locs])] except: max_loc = temp_segment.argmax() max_e = max_loc + e_lower_bound - 1 e[i] = max_e # c fiducial point temp_segment = ddx[int(b[i]):int(e[i])] max_locs, _ = find_peaks(temp_segment) if max_locs.size>0: max_loc = max_locs[0] else: temp_segment = dddx[int(b[i]):int(e[i])] min_locs, _ = find_peaks(-temp_segment) if min_locs.size > 0: max_loc = min_locs[np.argmin(temp_segment[min_locs])] else: max_locs, _ = find_peaks(temp_segment) try: max_loc = max_locs[0] except: max_loc = temp_segment.argmax() max_c = max_loc + b[i] - 1 c[i] = max_c # d fiducial point temp_segment = ddx[int(c[i]):int(e[i])] min_locs, _ = find_peaks(-temp_segment) if min_locs.size > 0: min_loc = min_locs[np.argmin(temp_segment[min_locs])] min_d = min_loc + c[i] - 1 else: min_d = max_c d[i] = min_d # f fiducial point temp_segment = ddx[int(e[i]):onsets[i + 1]] min_locs, _ = find_peaks(-temp_segment) if (min_locs.size > 0) and (min_locs[0]<len(sig)*0.8): min_loc = min_locs[0] else: min_loc = 0 min_f = min_loc + e[i] - 1 f[i] = min_f except: pass apg_fp = pd.DataFrame({"a":[], "b":[], "c":[],"d":[], "e":[], "f":[]}) apg_fp.a, apg_fp.b, apg_fp.c, apg_fp.d, apg_fp.e, apg_fp.f = a, b, c, d, e, f return apg_fp def get_jpg_fiducials(self, onsets: list, apg_fp: pd.DataFrame): """Calculate third derivitive points p1 and p2 from the PPG'" signal :param onsets: onsets of the signal :type onsets: list :param apg_fp: fiducial points of PPG" signal :type apg_fp: DataFrame :return: - p1: The first local maximum after the b-point on PPG'" - p2: The last local minimum after the b-point and before the d-point on PPG'" """ dddx = self.jpg nan_v = np.empty(len(onsets)-1) nan_v[:] = np.NaN p1, p2 = copy.deepcopy(nan_v), copy.deepcopy(nan_v) for i in range(0, len(onsets) - 1): try: # p1 fiducial point ref_b = apg_fp.b[np.squeeze(np.where(np.logical_and(apg_fp.b > onsets[i], apg_fp.b < onsets[i + 1])))] if ref_b.size == 0: ref_b = onsets[i] temp_segment = dddx[int(ref_b):onsets[i + 1]] max_locs, _ = find_peaks(temp_segment) try: max_loc = max_locs[0] except: max_loc = temp_segment.argmax() max_p1 = max_loc + ref_b - 1 p1[i] = max_p1 # p2 fiducial point ref_start = p1[i] ref_c = apg_fp.c[np.squeeze(np.where(np.logical_and(apg_fp.c > onsets[i], apg_fp.c < onsets[i + 1])))] ref_d = apg_fp.d[np.squeeze(np.where(np.logical_and(apg_fp.d > onsets[i], apg_fp.d < onsets[i + 1])))] if ref_d > ref_c: ref_end = ref_d min_ind = -1 elif ref_c.size > 0: ref_start = ref_c ref_end = onsets[i + 1] min_ind = 0 elif apg_fp.e.size > 0: ref_end = onsets[i + 1] min_ind = 0 temp_segment = dddx[int(ref_start):int(ref_end)] min_locs, _ = find_peaks(-temp_segment) if min_locs.size > 0: min_p2 = min_locs[min_ind] + ref_start - 1 else: min_p2 = ref_end p2[i] = min_p2 except: pass jpg_fp = pd.DataFrame({"p1": [], "p2": []}) jpg_fp.p1, jpg_fp.p2 = p1, p2 return jpg_fp def correct_fiducials(self,fiducials=pd.DataFrame(), correction=pd.DataFrame()): """Correct the Fiducial Points :param fiducials: DataFrame where the key is the name of the fiducial points and the value is the list of fiducial points PPG Fiducials Points :type fiducials: DataFrame :param correction: DataFrame where the key is the name of the fiducial points and the value is bool :type correction: DataFrame :return: - fiducials: a dictionary where the key is the name of the fiducial pints and the value is the list of fiducial points """ for i in range(0,len(fiducials.on)): # Correct onset if correction.on[0]: try: win_onr = self.fs * 0.005 if fiducials.on[i]>win_onr: win_onl = win_onr else: win_onl = fiducials.on[i] min_loc = np.argmin(self.ppg[fiducials.on[i]-win_onl:fiducials.on[i]+win_onr]) + fiducials.on[i] if fiducials.on[i] != min_loc: if fiducials.a[i] > self.fs*0.075: win_a = int(self.fs*0.075) else: win_a = int(fiducials.a[i]) fiducials.on[i] = np.argmax(self.jpg[int(fiducials.a[i]) - win_a:int(fiducials.a[i])]) + int(fiducials.a[i]) - win_a except: pass # Correct dicrotic notch if correction.dn[0]: try: temp_segment = self.ppg[int(fiducials.sp[i]):int(fiducials.dp[i])] min_dn = find_peaks(-temp_segment)[0] + fiducials.sp[i] diff_dn = abs(min_dn - fiducials.dp[i]) if len(min_dn) > 0 and diff_dn > round(self.fs / 100): fiducials.dn[i] = min_dn try: strt_dn = int(fiducials.sp[i]) stp_dn = int(fiducials.f[i]) fiducials.dn[i] = find_peaks(-self.ppg[strt_dn:stp_dn])[0][-1] + strt_dn if fiducials.dn[i] > min_dn: fiducials.dn[i] = min_dn except: strt_dn = fiducials.e[i] stp_dn = fiducials.f[i] fiducials.dn[i] = np.argmin(self.jpg[strt_dn:stp_dn]) + strt_dn if fiducials.dn[i] > min_dn: fiducials.dn[i] = min_dn except: pass # Correct w-point if correction.w[0]: if fiducials.w[i] > fiducials.f[i]: fiducials.loc[i, 'w'] = fiducials.f[i] if fiducials.w[i] < fiducials.e[i]: try: fiducials.loc[i, 'w'] = [np.argmax(self.vpg[int(fiducials.e[i]):int(fiducials.f[i])]) + fiducials.e[i]] except: pass # Correct v-point and w-point if correction.v[0] and correction.w[0]: if fiducials.v[i] > fiducials.e[i]: try: fiducials.loc[i, 'v'] = [np.argmin(self.vpg[int(fiducials.u[i]):int(fiducials.e[i])]) + fiducials.u[i]] fiducials.loc[i, 'w'] = [find_peaks(self.vpg[int(fiducials.v[i]):int(fiducials.f[i])])[0][0] + fiducials.v[i]] except: pass # Correct f-point if correction.f[0]: try: temp_end=int(np.diff(fiducials.on[i:i+2])*0.8) temp_segment=self.apg[int(fiducials.e[i]):int(fiducials.on[i]+temp_end)] min_f=np.argmin(temp_segment)+fiducials.e[i] if fiducials.w[i] > fiducials.f[i]: fiducials.f[i] = min_f except: pass # Correct diastolic peak if correction.dp[0]: try: fiducials.dp = self.get_diastolic_peak(fiducials.on, fiducials.dn, fiducials.e) except: pass return fiducials