Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 146 additions & 144 deletions src/vitabel/vitals.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,93 +1503,66 @@ def shocks(self):
"Error. Different Defibrillation keys contain different timestamp. Cannot construct single DataFrame from this"
)
return None

###
def compute_etco2_and_ventilations(
self,
source: Channel | str = "capnography",
mode: Literal['threshold', 'filter'] = 'filter',
mode: Literal["filter", "filter_onset", "threshold"] = "filter",
breath_threshold: float = 2,
etco2_threshold: float = 3,
**kwargs,
):
"""Computes end-tidal CO2 (etCO₂) values and timestamps of ventilations from the
capnography waveform, and adds them as labels.

The capnography signal must be present as a channel named 'capnography'. Two
detection methods are supported:

- ``'filter'``: An unpublished method by Wolfgang Kern (default).
- ``'threshold'``: The method described by Aramendi et al. in
:cite:`10.1016/j.resuscitation.2016.08.033`.

Parameters
----------
mode
Method to use for detecting ventilations from the CO₂ signal.
breath_threshold
Threshold below which a minimum is identified as a ventilation (default: 2 mmHg).
Used by the ``'filter'`` method.
etco2_threshold
Threshold above which a maximum is identified as an etCO₂ value of an expiration
(default: 3 mmHg). Used by the ``'filter'`` method.
"""
# Support legacy parameter name
Computes etCO₂ values and timestamps of ventilations from capnography waveform.
Supports three methods:
- 'filter': Wolfgang Kern (unpublished)
- 'filter_onset': like 'filter' + detects exhalation onset
- 'threshold': Aramendi et al., 2016
"""

if 'breaththresh' in kwargs:
if breath_threshold is not None:
raise TypeError("Cannot specify both 'breath_threshold' and legacy 'breaththresh'")
breath_threshold = kwargs.pop('breaththresh')
logger.warning(
"The keyword argument breaththresh is deprecated, "
"use breath_threshold instead"
)

logger.warning("The keyword argument breaththresh is deprecated, use breath_threshold instead")

if isinstance(source, str):
source = self.get_channel(name=source)

if not isinstance(source, Channel):
raise ValueError(
f"No valid capnography channel specified. Please make sure a channel named '{source}' "
"is added to the collection, or specify a suitable channel via "
"the source argument directly or by name"
)

co2_data = source.get_data() # get data
cotime, co = co2_data.time_index, co2_data.data

freq = np.timedelta64(1, "s") / np.nanmedian(cotime.diff())
cotime = np.asarray(cotime)
co = np.asarray(co)
if mode == "filter": # Wolfgang Kern's unpublished method
raise ValueError("Invalid capnography channel specified.")

# Helper: metadata for all labels
metadata = {
"creator": "automatic",
"creation_date": pd.Timestamp.now(),
"creation_mode": mode,
}

# Load signal
co2_data = source.get_data()
cotime, co = np.asarray(co2_data.time_index), np.asarray(co2_data.data)
freq = np.timedelta64(1, "s") / np.nanmedian(np.diff(cotime))

# Mode 1: FILTER (Kern original)
def _run_filter():
but = sgn.butter(4, 1 * 2 / freq, btype="lowpass", output="sos")
co2 = sgn.sosfiltfilt(but, co) # Filter forwarsd and backward
et_index = sgn.find_peaks(co2, distance=1 * freq, height=etco2_threshold)[
0
] # find peaks of filtered signal as markers for etco2
resp_index = sgn.find_peaks(
-co2, distance=1 * freq, height=-breath_threshold
)[ # find dips of filtered signal as markers for ventilations
0
]

etco2time = cotime[et_index] # take elements on this markers
co2 = sgn.sosfiltfilt(but, co)
et_index = sgn.find_peaks(co2, distance=1 * freq, height=etco2_threshold)[0]
resp_index = sgn.find_peaks(-co2, distance=1 * freq, height=-breath_threshold)[0]
etco2time = cotime[et_index]
etco2 = co[et_index]
resptime = cotime[resp_index]
resp_height = co[resp_index]

# initialize search for other markers
# --- original correction logic as before ---
k = 0
del_resp = []
more_resp_flag = False
min_resp_height = np.nan
min_resp_index = np.nan

# look a signal before first ventilation

co2_maxtime = etco2time[(etco2time < resptime[0])]
co2_max = etco2[(etco2time < resptime[0])]
netco2 = len(co2_maxtime)

# when there is more than a single maximum before first respiration, take only largest one
if netco2 > 1:
k_max = np.argmax(co2_max)
for j in range(k + k_max, k, -1):
Expand All @@ -1599,23 +1572,14 @@ def compute_etco2_and_ventilations(
etco2 = np.delete(etco2, k + 1)
etco2time = np.delete(etco2time, k + 1)
k += 1

# if there is no maximum
elif netco2 == 0:
pass
# if there is a single maximum
else:
elif netco2 == 1:
k += 1

for i, resp in enumerate(resptime[:-1]):
next_resp = resptime[i + 1]
# check maxima until next respiration same as bevfore
co2_maxtime = etco2time[
(etco2time >= resp) & (etco2time < next_resp)
]
co2_maxtime = etco2time[(etco2time >= resp) & (etco2time < next_resp)]
co2_max = etco2[(etco2time >= resp) & (etco2time < next_resp)]
netco2 = len(co2_maxtime)
if netco2 > 1: # take largest one
if netco2 > 1:
k_max = np.argmax(co2_max)
for j in range(k + k_max, k, -1):
etco2 = np.delete(etco2, k)
Expand All @@ -1637,34 +1601,83 @@ def compute_etco2_and_ventilations(
del_resp.append(i)
min_resp_height = resp_height[i + 1]
min_resp_index = i + 1

more_resp_flag = True
else:
del_resp.append(i + 1)
min_resp_height = resp_height[i]
min_resp_index = i
more_resp_flag = True

else:
more_resp_flag = False
k += 1

del_resp.sort()
for i in del_resp[::-1]:
resptime = np.delete(resptime, i)
return resptime, etco2time, etco2

# Mode 2: FILTER (KERN) INCL. DETECTION OF EXP ONSET
def _run_filter_onset():
"""
Enhanced ventilation detection using the KERN filter:
for each detected breath, the exhalation onset is identified
by tracing backward from the etCO₂ peak to the baseline threshold,
constrained between the detected respiration time and the etCO₂ peak.
This corresponds to the physiologically plausible start of expiration
in noisy time-resolved capnogram.

Returns:
resptime : original respiration times
etco2time : original times of etCO2 peaks
etco2 : original etCO2 peak values
exptime : computed exhalation onset times (based on breath_threshold)
"""

# Step 1: Run original filter-mode to get candidate breath times
resptime, etco2time, etco2 = _run_filter()

# List to store the detected exhalation start times
exptime = []

# Step 2: Loop over each detected respiration
for r in resptime:
# Find the next etCO2 peak that occurs after the current respiration
future_ets = etco2time[etco2time > r]
if len(future_ets) == 0:
# No future peak found (edge case) -> skip this respiration
continue
next_et = future_ets[0]

# Step 3: Convert times to indices in the raw CO2 waveform
i_et = np.searchsorted(cotime, next_et) # index of etCO2 peak
i_r = np.searchsorted(cotime, r) # index of respiration start

# Step 4: Search backwards from the etCO2 peak to find the breath_threshold
# (the point just after the CO2 began to rise)
exp_found = False
for i in range(i_et, i_r, -1):
if co[i] <= breath_threshold:
# breath_threshold of waveform found -> mark as exhalation start
exptime.append(cotime[i])
exp_found = True
break

# Step 5: Fallback: if no baseline found, use original respiration time
if not exp_found:
exptime.append(r)

# Step 6: Return results
return resptime, etco2time, etco2, exptime

elif mode == "threshold": # Aramendi et al., 2016
# Mode 3: THRESHOLD (Aramendi)
def _run_threshold():
but = sgn.butter(4, 10 * 2 / freq, btype="lowpass", output="sos")
co2 = sgn.sosfiltfilt(but, co) # Filter forwarsd and backward
co2 = sgn.sosfiltfilt(but, co)
d = freq * (co2[1:] - co2[:-1])
exp_index2 = sgn.find_peaks(d, height=0.35 * freq)[0]
ins_index2 = sgn.find_peaks(-d, height=0.45 * freq)[0]

final_flag = False
ins_index3 = []
exp_index3 = []
j_ins = 0
j_exp = 0
ins_index3, exp_index3 = [], []
j_ins, j_exp = 0, 0
while not final_flag:
ins_index3.append(ins_index2[j_ins])
while exp_index2[j_exp] < ins_index2[j_ins]:
Expand All @@ -1678,75 +1691,64 @@ def compute_etco2_and_ventilations(
if j_ins == len(ins_index2) - 1:
final_flag = True
break

resptime = []
etco2time = []
etco2 = []
Th1_list = [5 for i in range(5)]
Th2_list = [0.5 for i in range(5)]
Th3_list = [0 for i in range(5)]

resptime, etco2time, etco2 = [], [], []
Th1_list = [5] * 5
Th2_list = [0.5] * 5
Th3_list = [0] * 5
k = 0
for i_ins, i_exp, i_next_ins in zip(
ins_index3[:-1], exp_index3[:-1], ins_index3[1:]
):
for i_ins, i_exp, i_next_ins in zip(ins_index3[:-1], exp_index3[:-1], ins_index3[1:]):
D = (i_exp - i_ins) / freq
A_exp = 1 / (i_next_ins - i_exp) * np.sum(co2[i_exp:i_next_ins])
A_ins = 1 / (freq * D) * np.sum(co2[i_ins:i_exp])
A_r = (A_exp - A_ins) / A_exp
S = 1 / freq * np.sum(co2[i_exp : i_exp + int(freq)])
if len(resptime) > 0:
t_ref = pd.Timedelta(
(cotime[i_exp] - resptime[-1])
).total_seconds()
else:
t_ref = 2 # if t_ref >1.5 then it is ok, so 2 does the job
if D > 0.3:
if (
A_exp > 0.4 * np.mean(Th1_list)
and A_r > np.min([0.7 * np.mean(Th2_list), 0.5])
and S > 0.4 * np.mean(Th3_list)
):
if t_ref > 1.5:
resptime.append(cotime[i_exp])
Th1_list[k] = A_exp
Th2_list[k] = A_r
Th3_list[k] = S
etco2time.append(
cotime[i_exp + np.argmax(co2[i_exp:i_next_ins])]
)
etco2.append(np.max(co2[i_exp:i_next_ins]))
k += 1
k = k % 5
if mode == "threshold" or mode == "filter":
metadata = {
"creator": "automatic",
"creation_date": pd.Timestamp.now(),
"creation_mode": mode,
}
etco2_lab = Label(
"etco2_from_capnography",
time_index=etco2time,
data=etco2,
metadata=metadata,
plotstyle=DEFAULT_PLOT_STYLE.get("etco2_from_capnography", None),
)
source.attach_label(etco2_lab)

vent_lab = Label(
"ventilations_from_capnography",
time_index=resptime,
S = 1 / freq * np.sum(co2[i_exp:i_exp + int(freq)])
t_ref = 2 if len(resptime) == 0 else pd.Timedelta(cotime[i_exp] - resptime[-1]).total_seconds()
if D > 0.3 and A_exp > 0.4 * np.mean(Th1_list) and A_r > min(0.7 * np.mean(Th2_list), 0.5) and S > 0.4 * np.mean(Th3_list):
if t_ref > 1.5:
resptime.append(cotime[i_exp])
Th1_list[k] = A_exp
Th2_list[k] = A_r
Th3_list[k] = S
etco2time.append(cotime[i_exp + np.argmax(co2[i_exp:i_next_ins])])
etco2.append(np.max(co2[i_exp:i_next_ins]))
k = (k + 1) % 5
return resptime, etco2time, etco2

# --- RUN MODE ---
if mode == "filter":
resptime, etco2time, etco2 = _run_filter()
exptime = None
elif mode == "filter_onset":
resptime, etco2time, etco2, exptime = _run_filter_onset()
elif mode == "threshold":
resptime, etco2time, etco2 = _run_threshold()
exptime = None
else:
raise ValueError(f"Unknown mode: {mode}")

# Attach labels
source.attach_label(Label(
"etco2_from_capnography",
time_index=etco2time,
data=etco2,
metadata=metadata,
plotstyle=DEFAULT_PLOT_STYLE.get("etco2_from_capnography", None),
))
source.attach_label(Label(
"ventilations_from_capnography",
time_index=resptime,
data=None,
metadata=metadata,
plotstyle=DEFAULT_PLOT_STYLE.get("ventilations_from_capnography", None),
))
if exptime is not None:
source.attach_label(Label(
"exhalation_onsets",
time_index=exptime,
data=None,
metadata=metadata,
plotstyle=DEFAULT_PLOT_STYLE.get(
"ventilations_from_capnography", None
),
)
source.attach_label(vent_lab)
else:
logger.error(
f"mode {mode} not known. Please use either 'filter' or 'threshold' as argument"
)
plotstyle=DEFAULT_PLOT_STYLE.get("exhalation_onsets", None),
))

def cycle_duration_analysis(
self,
Expand Down
Loading