diff --git a/src/vitabel/vitals.py b/src/vitabel/vitals.py index 7e1fb02..69a81dc 100644 --- a/src/vitabel/vitals.py +++ b/src/vitabel/vitals.py @@ -1503,93 +1503,66 @@ def shocks(self): "Error. Different Defibrillation keys contain different timestamp. Cannot construct single DataFrame from this" ) return None - + ### def compute_etco2_and_ventilations( self, source: Channel | str = "capnography", - mode: Literal['threshold', 'filter'] = 'filter', + mode: Literal["filter", "filter_onset", "threshold"] = "filter", breath_threshold: float = 2, etco2_threshold: float = 3, **kwargs, ): - """Computes end-tidal CO2 (etCO₂) values and timestamps of ventilations from the - capnography waveform, and adds them as labels. - - The capnography signal must be present as a channel named 'capnography'. Two - detection methods are supported: - - - ``'filter'``: An unpublished method by Wolfgang Kern (default). - - ``'threshold'``: The method described by Aramendi et al. in - :cite:`10.1016/j.resuscitation.2016.08.033`. - - Parameters - ---------- - mode - Method to use for detecting ventilations from the CO₂ signal. - breath_threshold - Threshold below which a minimum is identified as a ventilation (default: 2 mmHg). - Used by the ``'filter'`` method. - etco2_threshold - Threshold above which a maximum is identified as an etCO₂ value of an expiration - (default: 3 mmHg). Used by the ``'filter'`` method. """ - # Support legacy parameter name + Computes etCO₂ values and timestamps of ventilations from capnography waveform. + Supports three methods: + - 'filter': Wolfgang Kern (unpublished) + - 'filter_onset': like 'filter' + detects exhalation onset + - 'threshold': Aramendi et al., 2016 + """ + if 'breaththresh' in kwargs: if breath_threshold is not None: raise TypeError("Cannot specify both 'breath_threshold' and legacy 'breaththresh'") breath_threshold = kwargs.pop('breaththresh') - logger.warning( - "The keyword argument breaththresh is deprecated, " - "use breath_threshold instead" - ) - + logger.warning("The keyword argument breaththresh is deprecated, use breath_threshold instead") + if isinstance(source, str): source = self.get_channel(name=source) - if not isinstance(source, Channel): - raise ValueError( - f"No valid capnography channel specified. Please make sure a channel named '{source}' " - "is added to the collection, or specify a suitable channel via " - "the source argument directly or by name" - ) - - co2_data = source.get_data() # get data - cotime, co = co2_data.time_index, co2_data.data - - freq = np.timedelta64(1, "s") / np.nanmedian(cotime.diff()) - cotime = np.asarray(cotime) - co = np.asarray(co) - if mode == "filter": # Wolfgang Kern's unpublished method + raise ValueError("Invalid capnography channel specified.") + + # Helper: metadata for all labels + metadata = { + "creator": "automatic", + "creation_date": pd.Timestamp.now(), + "creation_mode": mode, + } + + # Load signal + co2_data = source.get_data() + cotime, co = np.asarray(co2_data.time_index), np.asarray(co2_data.data) + freq = np.timedelta64(1, "s") / np.nanmedian(np.diff(cotime)) + + # Mode 1: FILTER (Kern original) + def _run_filter(): but = sgn.butter(4, 1 * 2 / freq, btype="lowpass", output="sos") - co2 = sgn.sosfiltfilt(but, co) # Filter forwarsd and backward - et_index = sgn.find_peaks(co2, distance=1 * freq, height=etco2_threshold)[ - 0 - ] # find peaks of filtered signal as markers for etco2 - resp_index = sgn.find_peaks( - -co2, distance=1 * freq, height=-breath_threshold - )[ # find dips of filtered signal as markers for ventilations - 0 - ] - - etco2time = cotime[et_index] # take elements on this markers + co2 = sgn.sosfiltfilt(but, co) + et_index = sgn.find_peaks(co2, distance=1 * freq, height=etco2_threshold)[0] + resp_index = sgn.find_peaks(-co2, distance=1 * freq, height=-breath_threshold)[0] + etco2time = cotime[et_index] etco2 = co[et_index] resptime = cotime[resp_index] resp_height = co[resp_index] - - # initialize search for other markers + + # --- original correction logic as before --- k = 0 del_resp = [] more_resp_flag = False min_resp_height = np.nan min_resp_index = np.nan - - # look a signal before first ventilation - co2_maxtime = etco2time[(etco2time < resptime[0])] co2_max = etco2[(etco2time < resptime[0])] netco2 = len(co2_maxtime) - - # when there is more than a single maximum before first respiration, take only largest one if netco2 > 1: k_max = np.argmax(co2_max) for j in range(k + k_max, k, -1): @@ -1599,23 +1572,14 @@ def compute_etco2_and_ventilations( etco2 = np.delete(etco2, k + 1) etco2time = np.delete(etco2time, k + 1) k += 1 - - # if there is no maximum - elif netco2 == 0: - pass - # if there is a single maximum - else: + elif netco2 == 1: k += 1 - for i, resp in enumerate(resptime[:-1]): next_resp = resptime[i + 1] - # check maxima until next respiration same as bevfore - co2_maxtime = etco2time[ - (etco2time >= resp) & (etco2time < next_resp) - ] + co2_maxtime = etco2time[(etco2time >= resp) & (etco2time < next_resp)] co2_max = etco2[(etco2time >= resp) & (etco2time < next_resp)] netco2 = len(co2_maxtime) - if netco2 > 1: # take largest one + if netco2 > 1: k_max = np.argmax(co2_max) for j in range(k + k_max, k, -1): etco2 = np.delete(etco2, k) @@ -1637,34 +1601,83 @@ def compute_etco2_and_ventilations( del_resp.append(i) min_resp_height = resp_height[i + 1] min_resp_index = i + 1 - more_resp_flag = True else: del_resp.append(i + 1) min_resp_height = resp_height[i] min_resp_index = i more_resp_flag = True - else: more_resp_flag = False k += 1 - del_resp.sort() for i in del_resp[::-1]: resptime = np.delete(resptime, i) + return resptime, etco2time, etco2 + + # Mode 2: FILTER (KERN) INCL. DETECTION OF EXP ONSET + def _run_filter_onset(): + """ + Enhanced ventilation detection using the KERN filter: + for each detected breath, the exhalation onset is identified + by tracing backward from the etCO₂ peak to the baseline threshold, + constrained between the detected respiration time and the etCO₂ peak. + This corresponds to the physiologically plausible start of expiration + in noisy time-resolved capnogram. + + Returns: + resptime : original respiration times + etco2time : original times of etCO2 peaks + etco2 : original etCO2 peak values + exptime : computed exhalation onset times (based on breath_threshold) + """ + + # Step 1: Run original filter-mode to get candidate breath times + resptime, etco2time, etco2 = _run_filter() + + # List to store the detected exhalation start times + exptime = [] + + # Step 2: Loop over each detected respiration + for r in resptime: + # Find the next etCO2 peak that occurs after the current respiration + future_ets = etco2time[etco2time > r] + if len(future_ets) == 0: + # No future peak found (edge case) -> skip this respiration + continue + next_et = future_ets[0] + + # Step 3: Convert times to indices in the raw CO2 waveform + i_et = np.searchsorted(cotime, next_et) # index of etCO2 peak + i_r = np.searchsorted(cotime, r) # index of respiration start + + # Step 4: Search backwards from the etCO2 peak to find the breath_threshold + # (the point just after the CO2 began to rise) + exp_found = False + for i in range(i_et, i_r, -1): + if co[i] <= breath_threshold: + # breath_threshold of waveform found -> mark as exhalation start + exptime.append(cotime[i]) + exp_found = True + break + + # Step 5: Fallback: if no baseline found, use original respiration time + if not exp_found: + exptime.append(r) + + # Step 6: Return results + return resptime, etco2time, etco2, exptime - elif mode == "threshold": # Aramendi et al., 2016 + # Mode 3: THRESHOLD (Aramendi) + def _run_threshold(): but = sgn.butter(4, 10 * 2 / freq, btype="lowpass", output="sos") - co2 = sgn.sosfiltfilt(but, co) # Filter forwarsd and backward + co2 = sgn.sosfiltfilt(but, co) d = freq * (co2[1:] - co2[:-1]) exp_index2 = sgn.find_peaks(d, height=0.35 * freq)[0] ins_index2 = sgn.find_peaks(-d, height=0.45 * freq)[0] - final_flag = False - ins_index3 = [] - exp_index3 = [] - j_ins = 0 - j_exp = 0 + ins_index3, exp_index3 = [], [] + j_ins, j_exp = 0, 0 while not final_flag: ins_index3.append(ins_index2[j_ins]) while exp_index2[j_exp] < ins_index2[j_ins]: @@ -1678,75 +1691,64 @@ def compute_etco2_and_ventilations( if j_ins == len(ins_index2) - 1: final_flag = True break - - resptime = [] - etco2time = [] - etco2 = [] - Th1_list = [5 for i in range(5)] - Th2_list = [0.5 for i in range(5)] - Th3_list = [0 for i in range(5)] - + resptime, etco2time, etco2 = [], [], [] + Th1_list = [5] * 5 + Th2_list = [0.5] * 5 + Th3_list = [0] * 5 k = 0 - for i_ins, i_exp, i_next_ins in zip( - ins_index3[:-1], exp_index3[:-1], ins_index3[1:] - ): + for i_ins, i_exp, i_next_ins in zip(ins_index3[:-1], exp_index3[:-1], ins_index3[1:]): D = (i_exp - i_ins) / freq A_exp = 1 / (i_next_ins - i_exp) * np.sum(co2[i_exp:i_next_ins]) A_ins = 1 / (freq * D) * np.sum(co2[i_ins:i_exp]) A_r = (A_exp - A_ins) / A_exp - S = 1 / freq * np.sum(co2[i_exp : i_exp + int(freq)]) - if len(resptime) > 0: - t_ref = pd.Timedelta( - (cotime[i_exp] - resptime[-1]) - ).total_seconds() - else: - t_ref = 2 # if t_ref >1.5 then it is ok, so 2 does the job - if D > 0.3: - if ( - A_exp > 0.4 * np.mean(Th1_list) - and A_r > np.min([0.7 * np.mean(Th2_list), 0.5]) - and S > 0.4 * np.mean(Th3_list) - ): - if t_ref > 1.5: - resptime.append(cotime[i_exp]) - Th1_list[k] = A_exp - Th2_list[k] = A_r - Th3_list[k] = S - etco2time.append( - cotime[i_exp + np.argmax(co2[i_exp:i_next_ins])] - ) - etco2.append(np.max(co2[i_exp:i_next_ins])) - k += 1 - k = k % 5 - if mode == "threshold" or mode == "filter": - metadata = { - "creator": "automatic", - "creation_date": pd.Timestamp.now(), - "creation_mode": mode, - } - etco2_lab = Label( - "etco2_from_capnography", - time_index=etco2time, - data=etco2, - metadata=metadata, - plotstyle=DEFAULT_PLOT_STYLE.get("etco2_from_capnography", None), - ) - source.attach_label(etco2_lab) - - vent_lab = Label( - "ventilations_from_capnography", - time_index=resptime, + S = 1 / freq * np.sum(co2[i_exp:i_exp + int(freq)]) + t_ref = 2 if len(resptime) == 0 else pd.Timedelta(cotime[i_exp] - resptime[-1]).total_seconds() + if D > 0.3 and A_exp > 0.4 * np.mean(Th1_list) and A_r > min(0.7 * np.mean(Th2_list), 0.5) and S > 0.4 * np.mean(Th3_list): + if t_ref > 1.5: + resptime.append(cotime[i_exp]) + Th1_list[k] = A_exp + Th2_list[k] = A_r + Th3_list[k] = S + etco2time.append(cotime[i_exp + np.argmax(co2[i_exp:i_next_ins])]) + etco2.append(np.max(co2[i_exp:i_next_ins])) + k = (k + 1) % 5 + return resptime, etco2time, etco2 + + # --- RUN MODE --- + if mode == "filter": + resptime, etco2time, etco2 = _run_filter() + exptime = None + elif mode == "filter_onset": + resptime, etco2time, etco2, exptime = _run_filter_onset() + elif mode == "threshold": + resptime, etco2time, etco2 = _run_threshold() + exptime = None + else: + raise ValueError(f"Unknown mode: {mode}") + + # Attach labels + source.attach_label(Label( + "etco2_from_capnography", + time_index=etco2time, + data=etco2, + metadata=metadata, + plotstyle=DEFAULT_PLOT_STYLE.get("etco2_from_capnography", None), + )) + source.attach_label(Label( + "ventilations_from_capnography", + time_index=resptime, + data=None, + metadata=metadata, + plotstyle=DEFAULT_PLOT_STYLE.get("ventilations_from_capnography", None), + )) + if exptime is not None: + source.attach_label(Label( + "exhalation_onsets", + time_index=exptime, data=None, metadata=metadata, - plotstyle=DEFAULT_PLOT_STYLE.get( - "ventilations_from_capnography", None - ), - ) - source.attach_label(vent_lab) - else: - logger.error( - f"mode {mode} not known. Please use either 'filter' or 'threshold' as argument" - ) + plotstyle=DEFAULT_PLOT_STYLE.get("exhalation_onsets", None), + )) def cycle_duration_analysis( self,