From 66faeaaf9605946f3fa34d9b33c65f7157fe833a Mon Sep 17 00:00:00 2001 From: vmcru Date: Tue, 4 Mar 2025 09:20:46 -0500 Subject: [PATCH 01/36] initial test for ica dynamic item inclusion --- benchmarks/MOABB/dataio/datasets.py | 7 +++ benchmarks/MOABB/dataio/ica.py | 59 +++++++++++++++++ benchmarks/MOABB/validate_ica.py | 98 +++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+) create mode 100644 benchmarks/MOABB/dataio/ica.py create mode 100644 benchmarks/MOABB/validate_ica.py diff --git a/benchmarks/MOABB/dataio/datasets.py b/benchmarks/MOABB/dataio/datasets.py index 0016c3f96..d20fd5049 100644 --- a/benchmarks/MOABB/dataio/datasets.py +++ b/benchmarks/MOABB/dataio/datasets.py @@ -23,6 +23,8 @@ from torch.utils.data import Dataset +from .ica import ICAProcessor + class RawEEGSample(TypedDict, total=False): """Default dictionary keys provided by `~RawEEGDataset`. @@ -94,10 +96,12 @@ def __init__( data, preload=False, verbose=None, + ica_processor: Optional[ICAProcessor] = None, dynamic_items=(), output_keys=(), ): self.verbose = verbose + self.ica_processor = ica_processor dynamic_items = [self._make_load_raw_dynamic_item(preload)] + list( dynamic_items ) @@ -297,6 +301,9 @@ def _make_load_raw_dynamic_item(self, preload: bool): @provides("info", "raw") def _load_raw(fpath: str): raw = self._read_raw_bids_cached(fpath, preload) + + if self.ica_processor is not None: + raw = self.ica_processor.process(raw, fpath) yield raw.info yield raw diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py new file mode 100644 index 000000000..0cc3c220e --- /dev/null +++ b/benchmarks/MOABB/dataio/ica.py @@ -0,0 +1,59 @@ +from pathlib import Path +from typing import Union + +import mne +from mne.preprocessing import ICA + + +class ICAProcessor: + """Handles ICA computation and application for EEG data. + + Arguments + --------- + n_components : int | float | None + Number of components to keep during ICA decomposition + random_state : int | None + Random state for reproducibility + """ + + def __init__(self, n_components=None, random_state=42): + self.n_components = n_components + self.random_state = random_state + + def get_ica_path(self, raw_path: Union[str, Path]) -> Path: + """Generate path where ICA solution should be stored.""" + path = Path(raw_path) + return path.parent / f"{path.stem}_ica.fif" + + def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: + """Compute ICA solution and save to disk.""" + # High-pass filter for ICA + raw_filtered = raw.copy() + raw_filtered.filter(l_freq=1.0, h_freq=None) + + ica = ICA( + n_components=self.n_components, + random_state=self.random_state + ) + ica.fit(raw) + ica.save(ica_path) + return ica + + + def process(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> mne.io.RawArray: + """Process raw data with ICA, computing or loading from cache.""" + if not raw.preload: + raw.load_data() + + ica_path = self.get_ica_path(raw_path) + + if not ica_path.exists(): + ica = self.compute_ica(raw, ica_path) + else: + ica = mne.preprocessing.read_ica(ica_path) + + # Create a copy of the raw data before applying ICA + raw_ica = raw.copy() + ica.apply(raw_ica) + + return raw_ica \ No newline at end of file diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py new file mode 100644 index 000000000..82a54037b --- /dev/null +++ b/benchmarks/MOABB/validate_ica.py @@ -0,0 +1,98 @@ +import logging +import os +from pathlib import Path +import time +import mne +import moabb +from moabb.datasets import BNCI2014_001 +from memory_profiler import profile + +from dataio.datasets import EpochedEEGDataset, RawEEGDataset, InMemoryDataset +from dataio.ica import ICAProcessor + +# Set up logging +mne.set_log_level(verbose=False) +moabb.set_log_level(level="ERROR") + +def test_ica_processing(): + # Test without ICA first + print("\nTesting without ICA:") + dataset_no_ica = EpochedEEGDataset.from_moabb( + BNCI2014_001(), + "data/MNE-BIDS-bnci2014-001-epoched.json", + save_path="data", + tmin=0, + tmax=4.0, + output_keys=["label", "subject", "session", "epoch"], + ) + + # Time iteration + start = time.time() + for _ in dataset_no_ica: + pass + print(f"Time without ICA: {time.time() - start:.2f}s") + + # Test with ICA + print("\nTesting with ICA:") + ica_processor = ICAProcessor(n_components=15) + dataset_with_ica = EpochedEEGDataset.from_moabb( + BNCI2014_001(), + "data/MNE-BIDS-bnci2014-001-epoched-ica.json", + save_path="data", + tmin=0, + tmax=4.0, + preload=True, + output_keys=["label", "subject", "session", "epoch"], # Removed ica_path + ica_processor=ica_processor + ) + + # First run - ICA computation and caching + print("First run (computing ICA):") + start = time.time() + for _ in dataset_with_ica: + pass + print(f"Time with ICA (first run): {time.time() - start:.2f}s") + + # Second run - should use cached ICA + print("\nSecond run (using cached ICA):") + start = time.time() + for _ in dataset_with_ica: + pass + print(f"Time with ICA (cached): {time.time() - start:.2f}s") + + # Test with InMemoryDataset wrapper + print("\nTesting with InMemoryDataset wrapper:") + dataset_with_ica_cached = InMemoryDataset(dataset_with_ica) + + start = time.time() + for _ in dataset_with_ica_cached: + pass + print(f"Time with ICA (in-memory cache): {time.time() - start:.2f}s") + + # Print some sample info + sample = dataset_with_ica[0] + print("\nSample info:") + print(f"Epoch shape: {sample['epoch'].shape}") + +@profile +def profile_memory_usage(): + ica_processor = ICAProcessor(n_components=15) + dataset = EpochedEEGDataset.from_moabb( + BNCI2014_001(), + "data/MNE-BIDS-bnci2014-001-epoched-ica.json", + save_path="data", + tmin=0, + tmax=4.0, + output_keys=["label", "subject", "session", "epoch"], # Removed ica_path + ica_processor=ica_processor + ) + + for _ in dataset: + pass + +if __name__ == "__main__": + print("Running performance tests...") + test_ica_processing() + + print("\nRunning memory profile...") + profile_memory_usage() \ No newline at end of file From 4b62e604d23fd117a2d6066588ed2a1232101f4f Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 5 Mar 2025 00:14:46 -0500 Subject: [PATCH 02/36] exposed method for ica, used mne BIDSPath, path needs checking for proper caching usage. ica still not leveraging caching correctly --- benchmarks/MOABB/dataio/ica.py | 56 +++++++++--- benchmarks/MOABB/validate_ica.py | 143 ++++++++++++++++++++----------- 2 files changed, 141 insertions(+), 58 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 0cc3c220e..362988152 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -1,8 +1,9 @@ from pathlib import Path -from typing import Union +from typing import Union, Optional, Dict, Any import mne from mne.preprocessing import ICA +from mne_bids import get_bids_path_from_fname, BIDSPath class ICAProcessor: @@ -12,34 +13,69 @@ class ICAProcessor: --------- n_components : int | float | None Number of components to keep during ICA decomposition + method : str + The ICA method to use. Can be 'fastica', 'infomax' or 'picard'. + Defaults to 'fastica'. random_state : int | None Random state for reproducibility + fit_params : dict | None + Additional parameters to pass to the ICA fit method. + See mne.preprocessing.ICA for details. + filter_params : dict | None + Parameters for the high-pass filter applied before ICA. + Defaults to {'l_freq': 1.0, 'h_freq': None} """ - def __init__(self, n_components=None, random_state=42): + def __init__( + self, + n_components=None, + method='fastica', + random_state=42, + fit_params: Optional[Dict[str, Any]] = None, + filter_params: Optional[Dict[str, Any]] = None, + ): self.n_components = n_components + self.method = method self.random_state = random_state + self.fit_params = fit_params or {} + self.filter_params = filter_params or {'l_freq': 1.0, 'h_freq': None} def get_ica_path(self, raw_path: Union[str, Path]) -> Path: - """Generate path where ICA solution should be stored.""" - path = Path(raw_path) - return path.parent / f"{path.stem}_ica.fif" + """Generate path where ICA solution should be stored. + + Creates a derivatives folder to store ICA solutions, following BIDS conventions. + """ + bids_path = get_bids_path_from_fname(raw_path) + # For derivatives, you can put them in a derivatives folder: + bids_path.root = (bids_path.root / ".." / "derivatives" / f"ica-{self.method}") + # Keep the same base entities: + bids_path.update( + suffix='eeg', # override or confirm suffix + extension='.fif', + description='ica', # <-- This sets a desc=ica entity + check=True, # If you do not want BIDSPath to fail on derivative checks + ) + # Make sure the folder is created + bids_path.fpath.parent.mkdir(parents=True, exist_ok=True) + + return bids_path.fpath def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: """Compute ICA solution and save to disk.""" # High-pass filter for ICA raw_filtered = raw.copy() - raw_filtered.filter(l_freq=1.0, h_freq=None) + raw_filtered.filter(**self.filter_params) ica = ICA( n_components=self.n_components, - random_state=self.random_state + method=self.method, + random_state=self.random_state, + **self.fit_params ) - ica.fit(raw) + ica.fit(raw_filtered) ica.save(ica_path) return ica - def process(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> mne.io.RawArray: """Process raw data with ICA, computing or loading from cache.""" if not raw.preload: @@ -50,7 +86,7 @@ def process(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> mne.io.Ra if not ica_path.exists(): ica = self.compute_ica(raw, ica_path) else: - ica = mne.preprocessing.read_ica(ica_path) + ica = mne.preprocessing.read_ica(ica_path, verbose='ERROR') # Create a copy of the raw data before applying ICA raw_ica = raw.copy() diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 82a54037b..8e2d3eaa3 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -14,85 +14,132 @@ mne.set_log_level(verbose=False) moabb.set_log_level(level="ERROR") -def test_ica_processing(): - # Test without ICA first - print("\nTesting without ICA:") - dataset_no_ica = EpochedEEGDataset.from_moabb( - BNCI2014_001(), - "data/MNE-BIDS-bnci2014-001-epoched.json", - save_path="data", - tmin=0, - tmax=4.0, - output_keys=["label", "subject", "session", "epoch"], +def test_ica_method(method: str, n_components: int = 15, **kwargs): + """Test a specific ICA method and return timing results.""" + print(f"\nTesting ICA method: {method}") + ica_processor = ICAProcessor( + n_components=n_components, + method=method, + **kwargs ) - # Time iteration - start = time.time() - for _ in dataset_no_ica: - pass - print(f"Time without ICA: {time.time() - start:.2f}s") - - # Test with ICA - print("\nTesting with ICA:") - ica_processor = ICAProcessor(n_components=15) - dataset_with_ica = EpochedEEGDataset.from_moabb( + dataset = EpochedEEGDataset.from_moabb( BNCI2014_001(), - "data/MNE-BIDS-bnci2014-001-epoched-ica.json", + f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", save_path="data", tmin=0, tmax=4.0, preload=True, - output_keys=["label", "subject", "session", "epoch"], # Removed ica_path + output_keys=["label", "subject", "session", "epoch"], ica_processor=ica_processor ) - # First run - ICA computation and caching + # First run - ICA computation print("First run (computing ICA):") start = time.time() - for _ in dataset_with_ica: + for _ in dataset: pass - print(f"Time with ICA (first run): {time.time() - start:.2f}s") + computation_time = time.time() - start + print(f"Time with {method} ICA (first run): {computation_time:.2f}s") - # Second run - should use cached ICA + # Second run - using cached ICA print("\nSecond run (using cached ICA):") start = time.time() - for _ in dataset_with_ica: + for _ in dataset: pass - print(f"Time with ICA (cached): {time.time() - start:.2f}s") + cached_time = time.time() - start + print(f"Time with {method} ICA (cached): {cached_time:.2f}s") - # Test with InMemoryDataset wrapper + # Memory-cached version print("\nTesting with InMemoryDataset wrapper:") - dataset_with_ica_cached = InMemoryDataset(dataset_with_ica) - + dataset_cached = InMemoryDataset(dataset) start = time.time() - for _ in dataset_with_ica_cached: + for _ in dataset_cached: pass - print(f"Time with ICA (in-memory cache): {time.time() - start:.2f}s") + memory_cached_time = time.time() - start + print(f"Time with {method} ICA (in-memory cache): {memory_cached_time:.2f}s") - # Print some sample info - sample = dataset_with_ica[0] - print("\nSample info:") - print(f"Epoch shape: {sample['epoch'].shape}") + return { + 'method': method, + 'computation_time': computation_time, + 'cached_time': cached_time, + 'memory_cached_time': memory_cached_time + } -@profile -def profile_memory_usage(): - ica_processor = ICAProcessor(n_components=15) - dataset = EpochedEEGDataset.from_moabb( +def compare_ica_methods(): + # Test without ICA first as baseline + print("\nTesting without ICA (baseline):") + dataset_no_ica = EpochedEEGDataset.from_moabb( BNCI2014_001(), - "data/MNE-BIDS-bnci2014-001-epoched-ica.json", + "data/MNE-BIDS-bnci2014-001-epoched.json", save_path="data", tmin=0, tmax=4.0, - output_keys=["label", "subject", "session", "epoch"], # Removed ica_path - ica_processor=ica_processor + output_keys=["label", "subject", "session", "epoch"], ) - - for _ in dataset: + + start = time.time() + for _ in dataset_no_ica: pass + baseline_time = time.time() - start + print(f"Time without ICA: {baseline_time:.2f}s") + + # Test different ICA methods + results = [] + + # Test Picard + results.append(test_ica_method( + 'picard', + n_components=15, + fit_params={'max_iter': 500} + )) + + # Test Infomax + results.append(test_ica_method( + 'infomax', + n_components=15, + fit_params={'max_iter': 1000} + )) + + # Print comparison + print("\nComparison Summary:") + print("-" * 50) + print(f"Baseline (no ICA): {baseline_time:.2f}s") + print("-" * 50) + for result in results: + print(f"Method: {result['method']}") + print(f" Computation time: {result['computation_time']:.2f}s") + print(f" Cached access time: {result['cached_time']:.2f}s") + print(f" In-memory cached time: {result['memory_cached_time']:.2f}s") + print("-" * 50) + +@profile +def profile_memory_usage(): + # Profile memory usage for both methods + for method in ['picard', 'infomax']: + print(f"\nProfiling {method} ICA:") + ica_processor = ICAProcessor( + n_components=15, + method=method, + fit_params={'max_iter': 500} if method == 'picard' else {'iteration': 1000} + ) + dataset = EpochedEEGDataset.from_moabb( + BNCI2014_001(), + f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", + save_path="data", + tmin=0, + tmax=4.0, + preload=True, + output_keys=["label", "subject", "session", "epoch"], + ica_processor=ica_processor + ) + + for _ in dataset: + pass if __name__ == "__main__": - print("Running performance tests...") - test_ica_processing() + print("Running ICA method comparison...") + compare_ica_methods() print("\nRunning memory profile...") profile_memory_usage() \ No newline at end of file From c3ec3dc5710949037a7ed73bf85709d75b0dbe0c Mon Sep 17 00:00:00 2001 From: Bru Date: Wed, 5 Mar 2025 18:09:37 +0000 Subject: [PATCH 03/36] Apply suggestions from code review --- benchmarks/MOABB/dataio/ica.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 362988152..82d1a7fe6 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -78,8 +78,6 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: def process(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> mne.io.RawArray: """Process raw data with ICA, computing or loading from cache.""" - if not raw.preload: - raw.load_data() ica_path = self.get_ica_path(raw_path) From 77a8c59d77ac48afe2d241bc1810d26ba16b5831 Mon Sep 17 00:00:00 2001 From: Bru Date: Wed, 5 Mar 2025 18:10:19 +0000 Subject: [PATCH 04/36] Apply suggestions from code review --- benchmarks/MOABB/dataio/ica.py | 2 +- benchmarks/MOABB/validate_ica.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 82d1a7fe6..59a43ed6f 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -90,4 +90,4 @@ def process(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> mne.io.Ra raw_ica = raw.copy() ica.apply(raw_ica) - return raw_ica \ No newline at end of file + return raw_ica diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 8e2d3eaa3..26a18582b 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -142,4 +142,4 @@ def profile_memory_usage(): compare_ica_methods() print("\nRunning memory profile...") - profile_memory_usage() \ No newline at end of file + profile_memory_usage() From 3a78e1d3e0a7267c5bda62f671c7a0b375518883 Mon Sep 17 00:00:00 2001 From: vmcru Date: Thu, 6 Mar 2025 21:25:28 -0500 Subject: [PATCH 05/36] modifications to ica and validate to pass merge --- benchmarks/MOABB/dataio/ica.py | 2 +- benchmarks/MOABB/validate_ica.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 59a43ed6f..4ad893006 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -3,7 +3,7 @@ import mne from mne.preprocessing import ICA -from mne_bids import get_bids_path_from_fname, BIDSPath +from mne_bids import get_bids_path_from_fname class ICAProcessor: diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 26a18582b..11152dbed 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -121,7 +121,7 @@ def profile_memory_usage(): ica_processor = ICAProcessor( n_components=15, method=method, - fit_params={'max_iter': 500} if method == 'picard' else {'iteration': 1000} + fit_params={'max_iter': 500} if method == 'picard' else {'max_iter': 1000} ) dataset = EpochedEEGDataset.from_moabb( BNCI2014_001(), From 86af6be7f7c2d7de17cd131679e8f7c5136d810f Mon Sep 17 00:00:00 2001 From: vmcru Date: Thu, 6 Mar 2025 21:42:10 -0500 Subject: [PATCH 06/36] removed unused imports from all files and added credits --- benchmarks/MOABB/dataio/datasets.py | 2 +- benchmarks/MOABB/dataio/ica.py | 43 +++++++++++------- benchmarks/MOABB/validate_ica.py | 70 ++++++++++++++++------------- 3 files changed, 65 insertions(+), 50 deletions(-) diff --git a/benchmarks/MOABB/dataio/datasets.py b/benchmarks/MOABB/dataio/datasets.py index 51fb84609..e7bfb348a 100644 --- a/benchmarks/MOABB/dataio/datasets.py +++ b/benchmarks/MOABB/dataio/datasets.py @@ -300,7 +300,7 @@ def _make_load_raw_dynamic_item(self, preload: bool): @provides("info", "raw") def _load_raw(fpath: str): raw = self._read_raw_bids_cached(fpath, preload) - + if self.ica_processor is not None: raw = self.ica_processor.process(raw, fpath) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 4ad893006..618446847 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -1,3 +1,8 @@ +"""Module for handling ICA computation and application for EEG data. +Author +------ +Victor Cruz, 2025 +""" from pathlib import Path from typing import Union, Optional, Dict, Any @@ -27,9 +32,9 @@ class ICAProcessor: """ def __init__( - self, - n_components=None, - method='fastica', + self, + n_components=None, + method="fastica", random_state=42, fit_params: Optional[Dict[str, Any]] = None, filter_params: Optional[Dict[str, Any]] = None, @@ -38,22 +43,24 @@ def __init__( self.method = method self.random_state = random_state self.fit_params = fit_params or {} - self.filter_params = filter_params or {'l_freq': 1.0, 'h_freq': None} + self.filter_params = filter_params or {"l_freq": 1.0, "h_freq": None} def get_ica_path(self, raw_path: Union[str, Path]) -> Path: """Generate path where ICA solution should be stored. - + Creates a derivatives folder to store ICA solutions, following BIDS conventions. """ bids_path = get_bids_path_from_fname(raw_path) # For derivatives, you can put them in a derivatives folder: - bids_path.root = (bids_path.root / ".." / "derivatives" / f"ica-{self.method}") + bids_path.root = ( + bids_path.root / ".." / "derivatives" / f"ica-{self.method}" + ) # Keep the same base entities: bids_path.update( - suffix='eeg', # override or confirm suffix - extension='.fif', - description='ica', # <-- This sets a desc=ica entity - check=True, # If you do not want BIDSPath to fail on derivative checks + suffix="eeg", # override or confirm suffix + extension=".fif", + description="ica", # <-- This sets a desc=ica entity + check=True, # If you do not want BIDSPath to fail on derivative checks ) # Make sure the folder is created bids_path.fpath.parent.mkdir(parents=True, exist_ok=True) @@ -70,24 +77,26 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: n_components=self.n_components, method=self.method, random_state=self.random_state, - **self.fit_params + **self.fit_params, ) ica.fit(raw_filtered) ica.save(ica_path) return ica - def process(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> mne.io.RawArray: + def process( + self, raw: mne.io.RawArray, raw_path: Union[str, Path] + ) -> mne.io.RawArray: """Process raw data with ICA, computing or loading from cache.""" - + ica_path = self.get_ica_path(raw_path) - + if not ica_path.exists(): ica = self.compute_ica(raw, ica_path) else: - ica = mne.preprocessing.read_ica(ica_path, verbose='ERROR') - + ica = mne.preprocessing.read_ica(ica_path, verbose="ERROR") + # Create a copy of the raw data before applying ICA raw_ica = raw.copy() ica.apply(raw_ica) - + return raw_ica diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 11152dbed..1e569f7f7 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -1,28 +1,29 @@ -import logging -import os -from pathlib import Path +'''File for testing ICA computation and application for EEG data. +Authors +------- +Victor Cruz, 2025 +''' import time import mne import moabb from moabb.datasets import BNCI2014_001 from memory_profiler import profile -from dataio.datasets import EpochedEEGDataset, RawEEGDataset, InMemoryDataset -from dataio.ica import ICAProcessor +from dataio.datasets import EpochedEEGDataset, InMemoryDataset +from dataio.ica import ICAProcessor # Set up logging mne.set_log_level(verbose=False) moabb.set_log_level(level="ERROR") + def test_ica_method(method: str, n_components: int = 15, **kwargs): """Test a specific ICA method and return timing results.""" print(f"\nTesting ICA method: {method}") ica_processor = ICAProcessor( - n_components=n_components, - method=method, - **kwargs + n_components=n_components, method=method, **kwargs ) - + dataset = EpochedEEGDataset.from_moabb( BNCI2014_001(), f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", @@ -31,7 +32,7 @@ def test_ica_method(method: str, n_components: int = 15, **kwargs): tmax=4.0, preload=True, output_keys=["label", "subject", "session", "epoch"], - ica_processor=ica_processor + ica_processor=ica_processor, ) # First run - ICA computation @@ -57,15 +58,18 @@ def test_ica_method(method: str, n_components: int = 15, **kwargs): for _ in dataset_cached: pass memory_cached_time = time.time() - start - print(f"Time with {method} ICA (in-memory cache): {memory_cached_time:.2f}s") + print( + f"Time with {method} ICA (in-memory cache): {memory_cached_time:.2f}s" + ) return { - 'method': method, - 'computation_time': computation_time, - 'cached_time': cached_time, - 'memory_cached_time': memory_cached_time + "method": method, + "computation_time": computation_time, + "cached_time": cached_time, + "memory_cached_time": memory_cached_time, } + def compare_ica_methods(): # Test without ICA first as baseline print("\nTesting without ICA (baseline):") @@ -77,7 +81,7 @@ def compare_ica_methods(): tmax=4.0, output_keys=["label", "subject", "session", "epoch"], ) - + start = time.time() for _ in dataset_no_ica: pass @@ -86,20 +90,18 @@ def compare_ica_methods(): # Test different ICA methods results = [] - + # Test Picard - results.append(test_ica_method( - 'picard', - n_components=15, - fit_params={'max_iter': 500} - )) - + results.append( + test_ica_method("picard", n_components=15, fit_params={"max_iter": 500}) + ) + # Test Infomax - results.append(test_ica_method( - 'infomax', - n_components=15, - fit_params={'max_iter': 1000} - )) + results.append( + test_ica_method( + "infomax", n_components=15, fit_params={"max_iter": 1000} + ) + ) # Print comparison print("\nComparison Summary:") @@ -113,15 +115,18 @@ def compare_ica_methods(): print(f" In-memory cached time: {result['memory_cached_time']:.2f}s") print("-" * 50) + @profile def profile_memory_usage(): # Profile memory usage for both methods - for method in ['picard', 'infomax']: + for method in ["picard", "infomax"]: print(f"\nProfiling {method} ICA:") ica_processor = ICAProcessor( n_components=15, method=method, - fit_params={'max_iter': 500} if method == 'picard' else {'max_iter': 1000} + fit_params={"max_iter": 500} + if method == "picard" + else {"max_iter": 1000}, ) dataset = EpochedEEGDataset.from_moabb( BNCI2014_001(), @@ -131,15 +136,16 @@ def profile_memory_usage(): tmax=4.0, preload=True, output_keys=["label", "subject", "session", "epoch"], - ica_processor=ica_processor + ica_processor=ica_processor, ) for _ in dataset: pass + if __name__ == "__main__": print("Running ICA method comparison...") compare_ica_methods() - + print("\nRunning memory profile...") profile_memory_usage() From f29636ce7b0d90eebb62ecc33b65e26edd2b7fae Mon Sep 17 00:00:00 2001 From: vmcru Date: Thu, 6 Mar 2025 21:44:44 -0500 Subject: [PATCH 07/36] updated validate_ica.py --- benchmarks/MOABB/validate_ica.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 1e569f7f7..8c62b422c 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -1,8 +1,8 @@ -'''File for testing ICA computation and application for EEG data. +"""File for testing ICA computation and application for EEG data. Authors ------- Victor Cruz, 2025 -''' +""" import time import mne import moabb From 30ff4be7b91dc5be973b309a1d00414ab1ade224 Mon Sep 17 00:00:00 2001 From: Bru Date: Tue, 18 Mar 2025 14:49:39 +0100 Subject: [PATCH 08/36] Apply suggestions from code review Co-authored-by: Drew Wagner <33100250+Drew-Wagner@users.noreply.github.com> --- benchmarks/MOABB/dataio/datasets.py | 3 --- benchmarks/MOABB/dataio/ica.py | 32 +++++++++++++++++------------ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/benchmarks/MOABB/dataio/datasets.py b/benchmarks/MOABB/dataio/datasets.py index e7bfb348a..85923f5f4 100644 --- a/benchmarks/MOABB/dataio/datasets.py +++ b/benchmarks/MOABB/dataio/datasets.py @@ -301,9 +301,6 @@ def _make_load_raw_dynamic_item(self, preload: bool): def _load_raw(fpath: str): raw = self._read_raw_bids_cached(fpath, preload) - if self.ica_processor is not None: - raw = self.ica_processor.process(raw, fpath) - yield raw.info yield raw diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 618446847..1df2b7603 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -83,20 +83,26 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: ica.save(ica_path) return ica - def process( - self, raw: mne.io.RawArray, raw_path: Union[str, Path] - ) -> mne.io.RawArray: - """Process raw data with ICA, computing or loading from cache.""" + @property + def dynamic_item(self): + @takes("raw", "fpath") + @provides("raw", "ica_path") + def process( + raw: mne.io.RawArray, fpath: Union[str, Path] + ): + """Process raw data with ICA, computing or loading from cache.""" - ica_path = self.get_ica_path(raw_path) + ica_path = self.get_ica_path(fpath) - if not ica_path.exists(): - ica = self.compute_ica(raw, ica_path) - else: - ica = mne.preprocessing.read_ica(ica_path, verbose="ERROR") + if not ica_path.exists(): + ica = self.compute_ica(raw, ica_path) + else: + ica = mne.preprocessing.read_ica(ica_path, verbose="ERROR") - # Create a copy of the raw data before applying ICA - raw_ica = raw.copy() - ica.apply(raw_ica) + # Create a copy of the raw data before applying ICA + raw_ica = raw.copy() + ica.apply(raw_ica) - return raw_ica + yield raw_ica + yield ica_path + return process From f3330d91e2bbc0a7798a530bdf2e4e05b0f595c6 Mon Sep 17 00:00:00 2001 From: vmcru Date: Sat, 22 Mar 2025 21:49:53 -0400 Subject: [PATCH 09/36] support for ica as process added. --- benchmarks/MOABB/dataio/datasets.py | 4 ++-- benchmarks/MOABB/dataio/ica.py | 1 + benchmarks/MOABB/validate_ica.py | 6 ++++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/benchmarks/MOABB/dataio/datasets.py b/benchmarks/MOABB/dataio/datasets.py index 85923f5f4..995039f6a 100644 --- a/benchmarks/MOABB/dataio/datasets.py +++ b/benchmarks/MOABB/dataio/datasets.py @@ -97,12 +97,12 @@ def __init__( data, preload=False, verbose=None, - ica_processor: Optional[ICAProcessor] = None, + #ica_processor: Optional[ICAProcessor] = None, dynamic_items=(), output_keys=(), ): self.verbose = verbose - self.ica_processor = ica_processor + #self.ica_processor = ica_processor dynamic_items = [self._make_load_raw_dynamic_item(preload)] + list( dynamic_items ) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 1df2b7603..bcf6b6ab8 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -10,6 +10,7 @@ from mne.preprocessing import ICA from mne_bids import get_bids_path_from_fname +from speechbrain.utils.data_pipeline import provides, takes class ICAProcessor: """Handles ICA computation and application for EEG data. diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 8c62b422c..3fbcb25ed 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -32,7 +32,8 @@ def test_ica_method(method: str, n_components: int = 15, **kwargs): tmax=4.0, preload=True, output_keys=["label", "subject", "session", "epoch"], - ica_processor=ica_processor, + #ica_processor=ica_processor, + dynamic_items=[ica_processor.dynamic_item] ) # First run - ICA computation @@ -136,7 +137,8 @@ def profile_memory_usage(): tmax=4.0, preload=True, output_keys=["label", "subject", "session", "epoch"], - ica_processor=ica_processor, + #ica_processor=ica_processor, + dynamic_items=[ica_processor.dynamic_item] ) for _ in dataset: From 4e8bb551805e56b58112cd2357217c77b174f871 Mon Sep 17 00:00:00 2001 From: vmcru Date: Sun, 23 Mar 2025 14:08:02 -0400 Subject: [PATCH 10/36] Added hashing, setting check, and fixed caching bug. --- benchmarks/MOABB/dataio/ica.py | 78 ++++++++++++-- benchmarks/MOABB/validate_ica.py | 176 +++++++++++++++++++++---------- 2 files changed, 190 insertions(+), 64 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index bcf6b6ab8..00b45aea0 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -5,6 +5,8 @@ """ from pathlib import Path from typing import Union, Optional, Dict, Any +import json +import hashlib import mne from mne.preprocessing import ICA @@ -39,22 +41,60 @@ def __init__( random_state=42, fit_params: Optional[Dict[str, Any]] = None, filter_params: Optional[Dict[str, Any]] = None, + use_hash: bool = True, ): self.n_components = n_components self.method = method self.random_state = random_state self.fit_params = fit_params or {} self.filter_params = filter_params or {"l_freq": 1.0, "h_freq": None} + self.use_hash = use_hash - def get_ica_path(self, raw_path: Union[str, Path]) -> Path: + def _get_params_hash(self) -> str: + """Generate a short hash of the ICA parameters.""" + # Select critical parameters that affect the ICA computation + # not accessible from ICA object for standarization + critical_params = { + 'n_components': self.n_components, + 'method': self.method, + 'filter_params': self.filter_params + } + # Create a deterministic string representation and hash it + param_str = json.dumps(critical_params, sort_keys=True) + return hashlib.md5(param_str.encode()).hexdigest()[:8] # First 8 chars are enough + + def get_ica_metadata(self) -> Dict: + """ Generate metadata dictionary for the ICA parameters. """ + return { + "n_components": self.n_components, + "method": self.method, + "random_state": self.random_state, + "filter_params": self.filter_params, + "fit_params": self.fit_params + } + + def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: """Generate path where ICA solution should be stored. Creates a derivatives folder to store ICA solutions, following BIDS conventions. + Returns + ------- + tuple[Path, Path] + Returns (ica_path, metadata_path) """ bids_path = get_bids_path_from_fname(raw_path) + + if self.use_hash: + param_hash = self._get_params_hash() + folder_name = f"ica-{self.method}-{param_hash}" + desc = f"ica{self.method}" + else: + folder_name = f"ica{self.method}" + desc = f"ica-{self.method}" + # For derivatives, you can put them in a derivatives folder: bids_path.root = ( - bids_path.root / ".." / "derivatives" / f"ica-{self.method}" + bids_path.root / ".." / "derivatives" / folder_name ) # Keep the same base entities: bids_path.update( @@ -66,7 +106,30 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> Path: # Make sure the folder is created bids_path.fpath.parent.mkdir(parents=True, exist_ok=True) - return bids_path.fpath + ica_path = bids_path.fpath + metadata_path = ica_path.with_suffix('.json') + + return ica_path, metadata_path + + def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): + """Save ICA solution and metadata to disk.""" + # Save ICA solution + ica.save(ica_path, overwrite=True) + + # Save metadata + with metadata_path.open('w') as f: + json.dump(self.get_ica_metadata(), f) + + def check_ica_metadata(self, metadata_path: Path) -> bool: + """Check if existing ICA metadata matches current parameters.""" + if not metadata_path.exists(): + return False + + with metadata_path.open() as f: + saved_metadata = json.load(f) + + current_metadata = self.get_ica_metadata() + return saved_metadata == current_metadata def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: """Compute ICA solution and save to disk.""" @@ -93,12 +156,13 @@ def process( ): """Process raw data with ICA, computing or loading from cache.""" - ica_path = self.get_ica_path(fpath) + ica_path, metadata_path = self.get_ica_path(fpath) - if not ica_path.exists(): - ica = self.compute_ica(raw, ica_path) - else: + if ica_path.exists() and self.check_ica_metadata(metadata_path): ica = mne.preprocessing.read_ica(ica_path, verbose="ERROR") + else: + ica = self.compute_ica(raw, ica_path) + self.save_ica(ica, ica_path, metadata_path) # Create a copy of the raw data before applying ICA raw_ica = raw.copy() diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 3fbcb25ed..64469e3df 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -6,6 +6,9 @@ import time import mne import moabb +import logging +from pathlib import Path +from datetime import datetime from moabb.datasets import BNCI2014_001 from memory_profiler import profile @@ -16,14 +19,55 @@ mne.set_log_level(verbose=False) moabb.set_log_level(level="ERROR") - -def test_ica_method(method: str, n_components: int = 15, **kwargs): +# Configure logging +def setup_logging(): + """Set up logging to both file and console.""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_dir = Path("logs") + log_dir.mkdir(exist_ok=True) + log_file = log_dir / f"ica_benchmark_{timestamp}.log" + + # Configure logging format + formatter = logging.Formatter('%(asctime)s - %(message)s') + + # File handler + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(formatter) + + # Console handler + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + # Set up logger + logger = logging.getLogger('ICA_benchmark') + logger.setLevel(logging.INFO) + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + return logger + +logger = setup_logging() + +def test_ica_method( + method: str, + n_components: int = 15, + use_hash: bool = True, + **kwargs +): """Test a specific ICA method and return timing results.""" - print(f"\nTesting ICA method: {method}") + logger.info(f"\nTesting ICA method: {method} (use_hash={use_hash})") + + start = time.time() ica_processor = ICAProcessor( - n_components=n_components, method=method, **kwargs + n_components=n_components, + method=method, + use_hash=use_hash, + **kwargs ) + time_init = time.time() - start + logger.info(f"Time to create processor: {time_init:.4f}s") + start = time.time() dataset = EpochedEEGDataset.from_moabb( BNCI2014_001(), f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", @@ -32,39 +76,43 @@ def test_ica_method(method: str, n_components: int = 15, **kwargs): tmax=4.0, preload=True, output_keys=["label", "subject", "session", "epoch"], - #ica_processor=ica_processor, dynamic_items=[ica_processor.dynamic_item] ) + time_create = time.time() - start + logger.info(f"Time to create dataset: {time_create:.2f}s") # First run - ICA computation - print("First run (computing ICA):") + logger.info("First run (computing ICA):") start = time.time() for _ in dataset: pass computation_time = time.time() - start - print(f"Time with {method} ICA (first run): {computation_time:.2f}s") + logger.info(f"Time with {method} ICA (first run): {computation_time:.2f}s") # Second run - using cached ICA - print("\nSecond run (using cached ICA):") + logger.info("\nSecond run (using cached ICA):") start = time.time() for _ in dataset: pass cached_time = time.time() - start - print(f"Time with {method} ICA (cached): {cached_time:.2f}s") + logger.info(f"Time with {method} ICA (cached): {cached_time:.2f}s") # Memory-cached version - print("\nTesting with InMemoryDataset wrapper:") + logger.info("\nTesting with InMemoryDataset wrapper:") dataset_cached = InMemoryDataset(dataset) start = time.time() for _ in dataset_cached: pass memory_cached_time = time.time() - start - print( + logger.info( f"Time with {method} ICA (in-memory cache): {memory_cached_time:.2f}s" ) return { "method": method, + "use_hash": use_hash, + "init_time": time_init, + "create_time": time_create, "computation_time": computation_time, "cached_time": cached_time, "memory_cached_time": memory_cached_time, @@ -73,7 +121,7 @@ def test_ica_method(method: str, n_components: int = 15, **kwargs): def compare_ica_methods(): # Test without ICA first as baseline - print("\nTesting without ICA (baseline):") + logger.info("\nTesting without ICA (baseline):") dataset_no_ica = EpochedEEGDataset.from_moabb( BNCI2014_001(), "data/MNE-BIDS-bnci2014-001-epoched.json", @@ -87,67 +135,81 @@ def compare_ica_methods(): for _ in dataset_no_ica: pass baseline_time = time.time() - start - print(f"Time without ICA: {baseline_time:.2f}s") + logger.info(f"Time without ICA: {baseline_time:.2f}s") # Test different ICA methods results = [] - # Test Picard - results.append( - test_ica_method("picard", n_components=15, fit_params={"max_iter": 500}) - ) + # Test Picard with and without hash + for use_hash in [True, False]: + results.append( + test_ica_method( + "picard", + n_components=15, + use_hash=use_hash, + fit_params={"max_iter": 500}, + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + ) - # Test Infomax - results.append( - test_ica_method( - "infomax", n_components=15, fit_params={"max_iter": 1000} + # Test Infomax with and without hash + for use_hash in [True, False]: + results.append( + test_ica_method( + "infomax", + n_components=15, + use_hash=use_hash, + fit_params={"max_iter": 1000}, + filter_params={"l_freq": 1.0, "h_freq": None}, + ) ) - ) # Print comparison - print("\nComparison Summary:") - print("-" * 50) - print(f"Baseline (no ICA): {baseline_time:.2f}s") - print("-" * 50) + logger.info("\nComparison Summary:") + logger.info("-" * 70) + logger.info(f"Baseline (no ICA): {baseline_time:.2f}s") + logger.info("-" * 70) for result in results: - print(f"Method: {result['method']}") - print(f" Computation time: {result['computation_time']:.2f}s") - print(f" Cached access time: {result['cached_time']:.2f}s") - print(f" In-memory cached time: {result['memory_cached_time']:.2f}s") - print("-" * 50) + logger.info(f"Method: {result['method']} (use_hash={result['use_hash']})") + logger.info(f" Initialization time: {result['init_time']:.4f}s") + logger.info(f" Dataset creation time: {result['create_time']:.2f}s") + logger.info(f" Computation time: {result['computation_time']:.2f}s") + logger.info(f" Cached access time: {result['cached_time']:.2f}s") + logger.info(f" In-memory cached time: {result['memory_cached_time']:.2f}s") + logger.info("-" * 70) @profile def profile_memory_usage(): - # Profile memory usage for both methods + # Profile memory usage for both methods with and without hash for method in ["picard", "infomax"]: - print(f"\nProfiling {method} ICA:") - ica_processor = ICAProcessor( - n_components=15, - method=method, - fit_params={"max_iter": 500} - if method == "picard" - else {"max_iter": 1000}, - ) - dataset = EpochedEEGDataset.from_moabb( - BNCI2014_001(), - f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", - save_path="data", - tmin=0, - tmax=4.0, - preload=True, - output_keys=["label", "subject", "session", "epoch"], - #ica_processor=ica_processor, - dynamic_items=[ica_processor.dynamic_item] - ) - - for _ in dataset: - pass + for use_hash in [True, False]: + logger.info(f"\nProfiling {method} ICA (use_hash={use_hash}):") + ica_processor = ICAProcessor( + n_components=15, + method=method, + use_hash=use_hash, + fit_params={"max_iter": 500 if method == "picard" else 1000}, + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + dataset = EpochedEEGDataset.from_moabb( + BNCI2014_001(), + f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", + save_path="data", + tmin=0, + tmax=4.0, + preload=True, + output_keys=["label", "subject", "session", "epoch"], + dynamic_items=[ica_processor.dynamic_item] + ) + + for _ in dataset: + pass if __name__ == "__main__": - print("Running ICA method comparison...") + logger.info("Running ICA method comparison...") compare_ica_methods() - print("\nRunning memory profile...") - profile_memory_usage() + logger.info("\nRunning memory profile...") + profile_memory_usage() \ No newline at end of file From b592c606981f4c5707bb36b8fc1d4c55b4556849 Mon Sep 17 00:00:00 2001 From: vmcru Date: Sun, 23 Mar 2025 14:36:35 -0400 Subject: [PATCH 11/36] precommit mods --- benchmarks/MOABB/dataio/datasets.py | 6 ++--- benchmarks/MOABB/dataio/ica.py | 38 ++++++++++++++--------------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/benchmarks/MOABB/dataio/datasets.py b/benchmarks/MOABB/dataio/datasets.py index 995039f6a..03a2790ea 100644 --- a/benchmarks/MOABB/dataio/datasets.py +++ b/benchmarks/MOABB/dataio/datasets.py @@ -24,8 +24,6 @@ from speechbrain.dataio.dataset import DynamicItemDataset from speechbrain.utils.data_pipeline import provides, takes -from .ica import ICAProcessor - class RawEEGSample(TypedDict, total=False): """Default dictionary keys provided by `~RawEEGDataset`. @@ -97,12 +95,12 @@ def __init__( data, preload=False, verbose=None, - #ica_processor: Optional[ICAProcessor] = None, + # ica_processor: Optional[ICAProcessor] = None, dynamic_items=(), output_keys=(), ): self.verbose = verbose - #self.ica_processor = ica_processor + # self.ica_processor = ica_processor dynamic_items = [self._make_load_raw_dynamic_item(preload)] + list( dynamic_items ) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 00b45aea0..7af5feaa5 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -14,6 +14,7 @@ from speechbrain.utils.data_pipeline import provides, takes + class ICAProcessor: """Handles ICA computation and application for EEG data. @@ -55,13 +56,15 @@ def _get_params_hash(self) -> str: # Select critical parameters that affect the ICA computation # not accessible from ICA object for standarization critical_params = { - 'n_components': self.n_components, - 'method': self.method, - 'filter_params': self.filter_params + "n_components": self.n_components, + "method": self.method, + "filter_params": self.filter_params, } # Create a deterministic string representation and hash it param_str = json.dumps(critical_params, sort_keys=True) - return hashlib.md5(param_str.encode()).hexdigest()[:8] # First 8 chars are enough + return hashlib.md5(param_str.encode()).hexdigest()[ + :8 + ] # First 8 chars are enough def get_ica_metadata(self) -> Dict: """ Generate metadata dictionary for the ICA parameters. """ @@ -70,7 +73,7 @@ def get_ica_metadata(self) -> Dict: "method": self.method, "random_state": self.random_state, "filter_params": self.filter_params, - "fit_params": self.fit_params + "fit_params": self.fit_params, } def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: @@ -87,15 +90,13 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: if self.use_hash: param_hash = self._get_params_hash() folder_name = f"ica-{self.method}-{param_hash}" - desc = f"ica{self.method}" + # desc = f"ica{self.method}" else: folder_name = f"ica{self.method}" - desc = f"ica-{self.method}" + # desc = f"ica-{self.method}" # For derivatives, you can put them in a derivatives folder: - bids_path.root = ( - bids_path.root / ".." / "derivatives" / folder_name - ) + bids_path.root = bids_path.root / ".." / "derivatives" / folder_name # Keep the same base entities: bids_path.update( suffix="eeg", # override or confirm suffix @@ -107,7 +108,7 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: bids_path.fpath.parent.mkdir(parents=True, exist_ok=True) ica_path = bids_path.fpath - metadata_path = ica_path.with_suffix('.json') + metadata_path = ica_path.with_suffix(".json") return ica_path, metadata_path @@ -115,19 +116,19 @@ def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): """Save ICA solution and metadata to disk.""" # Save ICA solution ica.save(ica_path, overwrite=True) - + # Save metadata - with metadata_path.open('w') as f: + with metadata_path.open("w") as f: json.dump(self.get_ica_metadata(), f) def check_ica_metadata(self, metadata_path: Path) -> bool: """Check if existing ICA metadata matches current parameters.""" if not metadata_path.exists(): return False - + with metadata_path.open() as f: saved_metadata = json.load(f) - + current_metadata = self.get_ica_metadata() return saved_metadata == current_metadata @@ -151,14 +152,12 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: def dynamic_item(self): @takes("raw", "fpath") @provides("raw", "ica_path") - def process( - raw: mne.io.RawArray, fpath: Union[str, Path] - ): + def process(raw: mne.io.RawArray, fpath: Union[str, Path]): """Process raw data with ICA, computing or loading from cache.""" ica_path, metadata_path = self.get_ica_path(fpath) - if ica_path.exists() and self.check_ica_metadata(metadata_path): + if ica_path.exists() and self.check_ica_metadata(metadata_path): ica = mne.preprocessing.read_ica(ica_path, verbose="ERROR") else: ica = self.compute_ica(raw, ica_path) @@ -170,4 +169,5 @@ def process( yield raw_ica yield ica_path + return process From e2973aa58d62d4d4687431f1fafa816fd633811d Mon Sep 17 00:00:00 2001 From: vmcru Date: Sun, 23 Mar 2025 15:14:09 -0400 Subject: [PATCH 12/36] formatting fix --- benchmarks/MOABB/dataio/ica.py | 2 -- benchmarks/MOABB/validate_ica.py | 44 +++++++++++++++++--------------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 7af5feaa5..4e4db01f1 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -90,10 +90,8 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: if self.use_hash: param_hash = self._get_params_hash() folder_name = f"ica-{self.method}-{param_hash}" - # desc = f"ica{self.method}" else: folder_name = f"ica{self.method}" - # desc = f"ica-{self.method}" # For derivatives, you can put them in a derivatives folder: bids_path.root = bids_path.root / ".." / "derivatives" / folder_name diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 64469e3df..6c2d3273e 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -19,50 +19,48 @@ mne.set_log_level(verbose=False) moabb.set_log_level(level="ERROR") + # Configure logging def setup_logging(): """Set up logging to both file and console.""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_dir = Path("logs") log_dir.mkdir(exist_ok=True) log_file = log_dir / f"ica_benchmark_{timestamp}.log" - + # Configure logging format - formatter = logging.Formatter('%(asctime)s - %(message)s') - + formatter = logging.Formatter("%(asctime)s - %(message)s") + # File handler file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) - + # Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) - + # Set up logger - logger = logging.getLogger('ICA_benchmark') + logger = logging.getLogger("ICA_benchmark") logger.setLevel(logging.INFO) logger.addHandler(file_handler) logger.addHandler(console_handler) - + return logger + logger = setup_logging() + def test_ica_method( - method: str, - n_components: int = 15, - use_hash: bool = True, - **kwargs + method: str, n_components: int = 15, use_hash: bool = True, **kwargs ): """Test a specific ICA method and return timing results.""" logger.info(f"\nTesting ICA method: {method} (use_hash={use_hash})") - + start = time.time() ica_processor = ICAProcessor( - n_components=n_components, - method=method, - use_hash=use_hash, - **kwargs + n_components=n_components, method=method, use_hash=use_hash, **kwargs ) time_init = time.time() - start logger.info(f"Time to create processor: {time_init:.4f}s") @@ -76,7 +74,7 @@ def test_ica_method( tmax=4.0, preload=True, output_keys=["label", "subject", "session", "epoch"], - dynamic_items=[ica_processor.dynamic_item] + dynamic_items=[ica_processor.dynamic_item], ) time_create = time.time() - start logger.info(f"Time to create dataset: {time_create:.2f}s") @@ -170,12 +168,16 @@ def compare_ica_methods(): logger.info(f"Baseline (no ICA): {baseline_time:.2f}s") logger.info("-" * 70) for result in results: - logger.info(f"Method: {result['method']} (use_hash={result['use_hash']})") + logger.info( + f"Method: {result['method']} (use_hash={result['use_hash']})" + ) logger.info(f" Initialization time: {result['init_time']:.4f}s") logger.info(f" Dataset creation time: {result['create_time']:.2f}s") logger.info(f" Computation time: {result['computation_time']:.2f}s") logger.info(f" Cached access time: {result['cached_time']:.2f}s") - logger.info(f" In-memory cached time: {result['memory_cached_time']:.2f}s") + logger.info( + f" In-memory cached time: {result['memory_cached_time']:.2f}s" + ) logger.info("-" * 70) @@ -200,7 +202,7 @@ def profile_memory_usage(): tmax=4.0, preload=True, output_keys=["label", "subject", "session", "epoch"], - dynamic_items=[ica_processor.dynamic_item] + dynamic_items=[ica_processor.dynamic_item], ) for _ in dataset: @@ -212,4 +214,4 @@ def profile_memory_usage(): compare_ica_methods() logger.info("\nRunning memory profile...") - profile_memory_usage() \ No newline at end of file + profile_memory_usage() From 3f8b646e09ea869f0937faf38846b8ba99b77aa6 Mon Sep 17 00:00:00 2001 From: vmcru Date: Sun, 23 Mar 2025 15:32:54 -0400 Subject: [PATCH 13/36] optional filtering added --- benchmarks/MOABB/dataio/ica.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 4e4db01f1..0eeb4ff2d 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -32,6 +32,7 @@ class ICAProcessor: See mne.preprocessing.ICA for details. filter_params : dict | None Parameters for the high-pass filter applied before ICA. + Set to None to skip filtering if data is already filtered. Defaults to {'l_freq': 1.0, 'h_freq': None} """ @@ -131,10 +132,19 @@ def check_ica_metadata(self, metadata_path: Path) -> bool: return saved_metadata == current_metadata def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: - """Compute ICA solution and save to disk.""" - # High-pass filter for ICA - raw_filtered = raw.copy() - raw_filtered.filter(**self.filter_params) + """Compute ICA solution and save to disk. + + If filter_params is provided, applies a high-pass filter before ICA computation. + This step can be skipped if the data is already filtered by setting + filter_params to None during ICAProcessor initialization. + """ + if self.filter_params is not None: + # Apply high-pass filter only if filter parameters are provided + raw_filtered = raw.copy() + raw_filtered.filter(**self.filter_params) + else: + # Use raw data directly if no filtering is needed + raw_filtered = raw ica = ICA( n_components=self.n_components, From 73d1e93cd5e92f4fbd17a437efc27bd932f4500b Mon Sep 17 00:00:00 2001 From: vmcru Date: Sun, 23 Mar 2025 15:50:52 -0400 Subject: [PATCH 14/36] added hashing to description name. --- benchmarks/MOABB/dataio/ica.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 0eeb4ff2d..0c874a7ef 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -91,8 +91,10 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: if self.use_hash: param_hash = self._get_params_hash() folder_name = f"ica-{self.method}-{param_hash}" + desc = f"ica{param_hash}" else: folder_name = f"ica{self.method}" + desc = f"ica" # For derivatives, you can put them in a derivatives folder: bids_path.root = bids_path.root / ".." / "derivatives" / folder_name @@ -100,7 +102,7 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: bids_path.update( suffix="eeg", # override or confirm suffix extension=".fif", - description="ica", # <-- This sets a desc=ica entity + description=desc, # <-- This sets a desc=ica entity check=True, # If you do not want BIDSPath to fail on derivative checks ) # Make sure the folder is created From ade6b8db5c933b1cab6c96634b72cbbdb8e8a928 Mon Sep 17 00:00:00 2001 From: vmcru Date: Sun, 23 Mar 2025 18:43:01 -0400 Subject: [PATCH 15/36] added python-picard dependency in extra-requirements.txt for ica picard computation --- benchmarks/MOABB/extra-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmarks/MOABB/extra-requirements.txt b/benchmarks/MOABB/extra-requirements.txt index 950f45760..ca20ff01f 100644 --- a/benchmarks/MOABB/extra-requirements.txt +++ b/benchmarks/MOABB/extra-requirements.txt @@ -2,3 +2,4 @@ git+https://github.com/braindecode/braindecode moabb orion[profet] scikit-learn +python-picard \ No newline at end of file From 8b6633ee8d4ef4c42225f1e42e4b2aa2c1c74788 Mon Sep 17 00:00:00 2001 From: vmcru Date: Sun, 23 Mar 2025 19:12:23 -0400 Subject: [PATCH 16/36] format fix extra-requirements.txt --- benchmarks/MOABB/extra-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/MOABB/extra-requirements.txt b/benchmarks/MOABB/extra-requirements.txt index ca20ff01f..641e7c55e 100644 --- a/benchmarks/MOABB/extra-requirements.txt +++ b/benchmarks/MOABB/extra-requirements.txt @@ -1,5 +1,5 @@ git+https://github.com/braindecode/braindecode moabb orion[profet] +python-picard scikit-learn -python-picard \ No newline at end of file From a1031d2ab9e5cef4af461a7e26ab4be823d8b943 Mon Sep 17 00:00:00 2001 From: Victor C <55927568+vmcru@users.noreply.github.com> Date: Tue, 25 Mar 2025 17:53:55 -0400 Subject: [PATCH 17/36] Update benchmarks/MOABB/dataio/ica.py description update Co-authored-by: Bru --- benchmarks/MOABB/dataio/ica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 0c874a7ef..11454cfc0 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -56,7 +56,7 @@ def _get_params_hash(self) -> str: """Generate a short hash of the ICA parameters.""" # Select critical parameters that affect the ICA computation # not accessible from ICA object for standarization - critical_params = { + base_params = { "n_components": self.n_components, "method": self.method, "filter_params": self.filter_params, From 282c0167fb7ba4dafe20fef2478ca6736067b938 Mon Sep 17 00:00:00 2001 From: Victor C <55927568+vmcru@users.noreply.github.com> Date: Tue, 25 Mar 2025 17:54:13 -0400 Subject: [PATCH 18/36] Update benchmarks/MOABB/dataio/ica.py description update 2 Co-authored-by: Bru --- benchmarks/MOABB/dataio/ica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 11454cfc0..1ee11d621 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -62,7 +62,7 @@ def _get_params_hash(self) -> str: "filter_params": self.filter_params, } # Create a deterministic string representation and hash it - param_str = json.dumps(critical_params, sort_keys=True) + param_str = json.dumps(base_params, sort_keys=True) return hashlib.md5(param_str.encode()).hexdigest()[ :8 ] # First 8 chars are enough From ad2e39de17cf5f101752f66dfd40be19b73e914b Mon Sep 17 00:00:00 2001 From: Victor C <55927568+vmcru@users.noreply.github.com> Date: Tue, 25 Mar 2025 22:10:39 -0400 Subject: [PATCH 19/36] Update benchmarks/MOABB/dataio/datasets.py Co-authored-by: Bru --- benchmarks/MOABB/dataio/datasets.py | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmarks/MOABB/dataio/datasets.py b/benchmarks/MOABB/dataio/datasets.py index 03a2790ea..392cc2925 100644 --- a/benchmarks/MOABB/dataio/datasets.py +++ b/benchmarks/MOABB/dataio/datasets.py @@ -95,7 +95,6 @@ def __init__( data, preload=False, verbose=None, - # ica_processor: Optional[ICAProcessor] = None, dynamic_items=(), output_keys=(), ): From 822dc46fe947bd6c41399bf070c4bc8590221ca5 Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 26 Mar 2025 14:20:26 -0400 Subject: [PATCH 20/36] renamic critical to base and removing unnecessary comments --- benchmarks/MOABB/dataio/datasets.py | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmarks/MOABB/dataio/datasets.py b/benchmarks/MOABB/dataio/datasets.py index 392cc2925..bcf333d47 100644 --- a/benchmarks/MOABB/dataio/datasets.py +++ b/benchmarks/MOABB/dataio/datasets.py @@ -99,7 +99,6 @@ def __init__( output_keys=(), ): self.verbose = verbose - # self.ica_processor = ica_processor dynamic_items = [self._make_load_raw_dynamic_item(preload)] + list( dynamic_items ) From c524d11be97cd78201f95f2ee5c9f70fc12901e4 Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 26 Mar 2025 14:38:05 -0400 Subject: [PATCH 21/36] tests upgrading pytest to see if it fixes breaks --- .github/workflows/tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f81e63fb0..5852c66db 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -32,6 +32,8 @@ jobs: pip install uv uv pip install --system -r requirements.txt cd benchmarks/MOABB && uv pip install --system -r extra-requirements.txt + - name: Upgrade pytest to a version compatible with Python 3.11 + run: pip install --upgrade pytest - name: Display Python version run: python -c "import sys; print(sys.version)" - name: Consistency tests with pytest From a6fb93306d2eadd5d56c3779824af97725f17f9a Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 26 Mar 2025 14:46:32 -0400 Subject: [PATCH 22/36] added docstrings to process andn dynamic items functions. --- benchmarks/MOABB/dataio/ica.py | 42 +++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 1ee11d621..b1e07448c 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -160,10 +160,50 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: @property def dynamic_item(self): + """Creates a dynamic pipeline item for ICA processing. + + This property creates a function that can be used as a dynamic item in a + SpeechBrain pipeline. The function handles: + 1. Loading or computing ICA solutions + 2. Applying ICA to the raw data + 3. Caching results to disk + + The pipeline item: + Takes: + - raw (mne.io.RawArray): The raw EEG data + - fpath (Union[str, Path]): Path to the raw data file + + Provides: + - raw (mne.io.RawArray): The ICA-processed EEG data + - ica_path (Path): Path to the saved ICA solution + + Returns + ------- + callable + A function that can be used as a dynamic item in a SpeechBrain pipeline. + """ @takes("raw", "fpath") @provides("raw", "ica_path") def process(raw: mne.io.RawArray, fpath: Union[str, Path]): - """Process raw data with ICA, computing or loading from cache.""" + """Process raw data with ICA, computing or loading from cache. + + Checks for existing ICA solution in cache. If found and valid, loads and + applies it. Otherwise, computes new ICA solution, saves it, and applies it. + + Arguments + --------- + raw : mne.io.RawArray + The raw EEG data to process + fpath : Union[str, Path] + Path to the raw data file, used to generate cache paths + + Yields + ------ + mne.io.RawArray + The ICA-processed EEG data + Path + Path to the saved ICA solution + """ ica_path, metadata_path = self.get_ica_path(fpath) From 544e6381e1864d71db9776e0d5e20846d3b19fb8 Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 26 Mar 2025 14:48:58 -0400 Subject: [PATCH 23/36] formatting fixes --- benchmarks/MOABB/dataio/ica.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index b1e07448c..c8285a228 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -161,33 +161,34 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: @property def dynamic_item(self): """Creates a dynamic pipeline item for ICA processing. - - This property creates a function that can be used as a dynamic item in a + + This property creates a function that can be used as a dynamic item in a SpeechBrain pipeline. The function handles: 1. Loading or computing ICA solutions 2. Applying ICA to the raw data 3. Caching results to disk - + The pipeline item: Takes: - raw (mne.io.RawArray): The raw EEG data - fpath (Union[str, Path]): Path to the raw data file - + Provides: - raw (mne.io.RawArray): The ICA-processed EEG data - ica_path (Path): Path to the saved ICA solution - + Returns ------- callable A function that can be used as a dynamic item in a SpeechBrain pipeline. """ + @takes("raw", "fpath") @provides("raw", "ica_path") def process(raw: mne.io.RawArray, fpath: Union[str, Path]): """Process raw data with ICA, computing or loading from cache. - Checks for existing ICA solution in cache. If found and valid, loads and + Checks for existing ICA solution in cache. If found and valid, loads and applies it. Otherwise, computes new ICA solution, saves it, and applies it. Arguments From 77daeeb9c02962788d35eb166911976d0c3b9d0f Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 26 Mar 2025 15:09:37 -0400 Subject: [PATCH 24/36] docstring fix --- benchmarks/MOABB/dataio/ica.py | 134 +++++++++++++++++++++++---------- 1 file changed, 96 insertions(+), 38 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index c8285a228..ea1cc5b53 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -34,6 +34,17 @@ class ICAProcessor: Parameters for the high-pass filter applied before ICA. Set to None to skip filtering if data is already filtered. Defaults to {'l_freq': 1.0, 'h_freq': None} + + Example + ------- + >>> raw = mne.io.RawArray(data, info) # Create some MNE raw data + >>> ica_processor = ICAProcessor( + ... n_components=15, + ... method="picard", + ... fit_params={"max_iter": 500} + ... ) + >>> # Use in a SpeechBrain pipeline + >>> pipeline.add_dynamic_item(ica_processor.dynamic_item) """ def __init__( @@ -53,7 +64,18 @@ def __init__( self.use_hash = use_hash def _get_params_hash(self) -> str: - """Generate a short hash of the ICA parameters.""" + """Generate a short hash of the ICA parameters. + + Arguments + --------- + None + Uses instance attributes n_components, method, and filter_params. + + Returns + ------- + str + 8-character hexadecimal hash of the parameters. + """ # Select critical parameters that affect the ICA computation # not accessible from ICA object for standarization base_params = { @@ -68,7 +90,23 @@ def _get_params_hash(self) -> str: ] # First 8 chars are enough def get_ica_metadata(self) -> Dict: - """ Generate metadata dictionary for the ICA parameters. """ + """Generate metadata dictionary for the ICA parameters. + + Arguments + --------- + None + Uses instance attributes. + + Returns + ------- + dict + Dictionary containing all ICA parameters: + - n_components + - method + - random_state + - filter_params + - fit_params + """ return { "n_components": self.n_components, "method": self.method, @@ -80,11 +118,16 @@ def get_ica_metadata(self) -> Dict: def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: """Generate path where ICA solution should be stored. - Creates a derivatives folder to store ICA solutions, following BIDS conventions. + Arguments + --------- + raw_path : str | Path + Path to the raw data file. + Returns ------- tuple[Path, Path] - Returns (ica_path, metadata_path) + - Path to ICA solution file + - Path to metadata JSON file """ bids_path = get_bids_path_from_fname(raw_path) @@ -114,7 +157,21 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: return ica_path, metadata_path def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): - """Save ICA solution and metadata to disk.""" + """Save ICA solution and metadata to disk. + + Arguments + --------- + ica : mne.preprocessing.ICA + The ICA solution to save. + ica_path : Path + Path where to save the ICA solution. + metadata_path : Path + Path where to save the metadata JSON. + + Returns + ------- + None + """ # Save ICA solution ica.save(ica_path, overwrite=True) @@ -123,7 +180,19 @@ def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): json.dump(self.get_ica_metadata(), f) def check_ica_metadata(self, metadata_path: Path) -> bool: - """Check if existing ICA metadata matches current parameters.""" + """Check if existing ICA metadata matches current parameters. + + Arguments + --------- + metadata_path : Path + Path to the metadata JSON file to check. + + Returns + ------- + bool + True if metadata exists and matches current parameters, + False otherwise. + """ if not metadata_path.exists(): return False @@ -139,6 +208,18 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: If filter_params is provided, applies a high-pass filter before ICA computation. This step can be skipped if the data is already filtered by setting filter_params to None during ICAProcessor initialization. + + Arguments + --------- + raw : mne.io.RawArray + The raw EEG data to process. + ica_path : Path + Path where to save the computed ICA solution. + + Returns + ------- + mne.preprocessing.ICA + The computed ICA solution. """ if self.filter_params is not None: # Apply high-pass filter only if filter parameters are provided @@ -162,49 +243,26 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: def dynamic_item(self): """Creates a dynamic pipeline item for ICA processing. - This property creates a function that can be used as a dynamic item in a - SpeechBrain pipeline. The function handles: - 1. Loading or computing ICA solutions - 2. Applying ICA to the raw data - 3. Caching results to disk + Arguments + --------- + None + Uses instance methods and attributes. - The pipeline item: + Returns + ------- + callable + A function that: Takes: - raw (mne.io.RawArray): The raw EEG data - fpath (Union[str, Path]): Path to the raw data file - Provides: - raw (mne.io.RawArray): The ICA-processed EEG data - ica_path (Path): Path to the saved ICA solution - - Returns - ------- - callable - A function that can be used as a dynamic item in a SpeechBrain pipeline. """ - @takes("raw", "fpath") @provides("raw", "ica_path") def process(raw: mne.io.RawArray, fpath: Union[str, Path]): - """Process raw data with ICA, computing or loading from cache. - - Checks for existing ICA solution in cache. If found and valid, loads and - applies it. Otherwise, computes new ICA solution, saves it, and applies it. - - Arguments - --------- - raw : mne.io.RawArray - The raw EEG data to process - fpath : Union[str, Path] - Path to the raw data file, used to generate cache paths - - Yields - ------ - mne.io.RawArray - The ICA-processed EEG data - Path - Path to the saved ICA solution - """ + """Process raw data with ICA, computing or loading from cache.""" ica_path, metadata_path = self.get_ica_path(fpath) From 84e09dc0c489442d76dd9974f4e6785dfa4b4cd7 Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 26 Mar 2025 15:17:35 -0400 Subject: [PATCH 25/36] docstring adaptations for validate_ica.py --- benchmarks/MOABB/validate_ica.py | 49 ++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py index 6c2d3273e..665eedaa2 100644 --- a/benchmarks/MOABB/validate_ica.py +++ b/benchmarks/MOABB/validate_ica.py @@ -22,7 +22,16 @@ # Configure logging def setup_logging(): - """Set up logging to both file and console.""" + """Set up logging to both file and console. + + The logs are written to a file in the 'logs' directory, with a timestamp + in the filename. The logs are also printed to the console. + + Returns + ------- + logging.Logger + The configured logger instance. + """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_dir = Path("logs") @@ -55,7 +64,28 @@ def setup_logging(): def test_ica_method( method: str, n_components: int = 15, use_hash: bool = True, **kwargs ): - """Test a specific ICA method and return timing results.""" + """Test a specific ICA method and return timing results. + + This function creates an ICAProcessor, runs the EpochedEEGDataset with the + processor, and measures the time taken for various steps, including initial + ICA computation, caching, and in-memory caching. + + Arguments + --------- + method : str + The ICA method to test, either 'picard' or 'infomax'. + n_components : int, optional + The number of ICA components to use, by default 15. + use_hash : bool, optional + Whether to use parameter hashing for caching, by default True. + **kwargs + Additional parameters to pass to the ICAProcessor constructor. + + Returns + ------- + dict + A dictionary containing the timing results for the tested ICA method. + """ logger.info(f"\nTesting ICA method: {method} (use_hash={use_hash})") start = time.time() @@ -118,6 +148,12 @@ def test_ica_method( def compare_ica_methods(): + """Compare the performance of different ICA methods. + + This function tests the Picard and Infomax ICA methods, both with and without + parameter hashing for caching. It also tests the baseline performance without + any ICA processing. The results are logged to the console and the log file. + """ # Test without ICA first as baseline logger.info("\nTesting without ICA (baseline):") dataset_no_ica = EpochedEEGDataset.from_moabb( @@ -183,6 +219,11 @@ def compare_ica_methods(): @profile def profile_memory_usage(): + """Profile the memory usage of ICA processing. + + This function runs the ICA processing for both Picard and Infomax methods, + with and without parameter hashing, and profiles the memory usage. + """ # Profile memory usage for both methods with and without hash for method in ["picard", "infomax"]: for use_hash in [True, False]: @@ -210,6 +251,10 @@ def profile_memory_usage(): if __name__ == "__main__": + """Entry point for the ICA benchmark script. + + Runs the ICA method comparison and the memory usage profiling. + """ logger.info("Running ICA method comparison...") compare_ica_methods() From 96e25464d81e674ae527640fa2174227e1efe5cf Mon Sep 17 00:00:00 2001 From: vmcru Date: Wed, 26 Mar 2025 15:22:45 -0400 Subject: [PATCH 26/36] precommit fixes --- benchmarks/MOABB/dataio/ica.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index ea1cc5b53..e989afe39 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -34,7 +34,7 @@ class ICAProcessor: Parameters for the high-pass filter applied before ICA. Set to None to skip filtering if data is already filtered. Defaults to {'l_freq': 1.0, 'h_freq': None} - + Example ------- >>> raw = mne.io.RawArray(data, info) # Create some MNE raw data @@ -158,7 +158,7 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): """Save ICA solution and metadata to disk. - + Arguments --------- ica : mne.preprocessing.ICA @@ -181,7 +181,7 @@ def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): def check_ica_metadata(self, metadata_path: Path) -> bool: """Check if existing ICA metadata matches current parameters. - + Arguments --------- metadata_path : Path @@ -259,6 +259,7 @@ def dynamic_item(self): - raw (mne.io.RawArray): The ICA-processed EEG data - ica_path (Path): Path to the saved ICA solution """ + @takes("raw", "fpath") @provides("raw", "ica_path") def process(raw: mne.io.RawArray, fpath: Union[str, Path]): From 6e22fe5665cdb568c47b5b80f81c68d9fad53306 Mon Sep 17 00:00:00 2001 From: vmcru Date: Thu, 27 Mar 2025 22:28:17 -0400 Subject: [PATCH 27/36] rework of the metadata checking and storing --- benchmarks/MOABB/dataio/ica.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index e989afe39..90ef989da 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -44,7 +44,7 @@ class ICAProcessor: ... fit_params={"max_iter": 500} ... ) >>> # Use in a SpeechBrain pipeline - >>> pipeline.add_dynamic_item(ica_processor.dynamic_item) + >>> # Dynammic item to be used in pipeline: ica_processor.dynamic_item """ def __init__( From ab42aa9fe34f1edec1b100fe25354187396bd0e5 Mon Sep 17 00:00:00 2001 From: vmcru Date: Thu, 27 Mar 2025 22:29:19 -0400 Subject: [PATCH 28/36] metadata changes --- benchmarks/MOABB/dataio/ica.py | 155 ++++++++++++++++++++++----------- 1 file changed, 105 insertions(+), 50 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 90ef989da..13f31ca98 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -7,6 +7,7 @@ from typing import Union, Optional, Dict, Any import json import hashlib +from datetime import datetime import mne from mne.preprocessing import ICA @@ -63,63 +64,102 @@ def __init__( self.filter_params = filter_params or {"l_freq": 1.0, "h_freq": None} self.use_hash = use_hash - def _get_params_hash(self) -> str: - """Generate a short hash of the ICA parameters. + + def _get_data_params(self, raw: mne.io.RawArray) -> Dict: + """Extract relevant parameters from raw.info. Arguments --------- - None - Uses instance attributes n_components, method, and filter_params. + raw : mne.io.RawArray + The raw EEG data. + + Returns + ------- + dict + Dictionary containing relevant data parameters. + """ + return { + 'highpass': raw.info['highpass'], + 'lowpass': raw.info['lowpass'], + 'sfreq': raw.info['sfreq'], + 'n_channels': len(raw.info['ch_names']), + } + + def _get_ica_params(self) -> Dict: + """Get ICA-specific processing parameters. + + Returns + ------- + dict + Dictionary containing ICA processing parameters. + """ + return { + 'n_components': self.n_components, + 'method': self.method, + 'random_state': self.random_state, + 'fit_params': self.fit_params, + 'filter_params': self.filter_params, + } + + def _get_params_hash(self, raw: mne.io.RawArray) -> str: + """Generate hash based on both data and ICA parameters. + + Arguments + --------- + raw : mne.io.RawArray + The raw EEG data. Returns ------- str 8-character hexadecimal hash of the parameters. """ - # Select critical parameters that affect the ICA computation - # not accessible from ICA object for standarization - base_params = { - "n_components": self.n_components, - "method": self.method, - "filter_params": self.filter_params, + # Only include parameters that affect the ICA computation + hash_params = { + 'data_params': { + 'highpass': raw.info['highpass'], + 'lowpass': raw.info['lowpass'], + 'sfreq': raw.info['sfreq'], + 'n_channels': len(raw.info['ch_names']) + }, + 'ica_params': { + 'n_components': self.n_components, + 'method': self.method, + 'filter_params': self.filter_params + } } - # Create a deterministic string representation and hash it - param_str = json.dumps(base_params, sort_keys=True) - return hashlib.md5(param_str.encode()).hexdigest()[ - :8 - ] # First 8 chars are enough + param_str = json.dumps(hash_params, sort_keys=True) + return hashlib.md5(param_str.encode()).hexdigest()[:8] - def get_ica_metadata(self) -> Dict: - """Generate metadata dictionary for the ICA parameters. + def get_ica_metadata(self, raw: mne.io.RawArray) -> Dict: + """Generate complete metadata including both data and ICA parameters. Arguments --------- - None - Uses instance attributes. + raw : mne.io.RawArray + The raw EEG data. Returns ------- dict - Dictionary containing all ICA parameters: - - n_components - - method - - random_state - - filter_params - - fit_params + Complete metadata dictionary. """ return { - "n_components": self.n_components, - "method": self.method, - "random_state": self.random_state, - "filter_params": self.filter_params, - "fit_params": self.fit_params, + 'data_params': self._get_data_params(raw), + 'ica_params': self._get_ica_params(), + 'metadata': { + 'creation_date': datetime.now().isoformat(), + 'raw_filename': str(raw.filenames[0]) if raw.filenames else None + } } - def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: + def get_ica_path(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> tuple[Path, Path]: """Generate path where ICA solution should be stored. Arguments --------- + raw : mne.io.RawArray + The raw EEG data. raw_path : str | Path Path to the raw data file. @@ -132,22 +172,24 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: bids_path = get_bids_path_from_fname(raw_path) if self.use_hash: - param_hash = self._get_params_hash() + param_hash = self._get_params_hash(raw) folder_name = f"ica-{self.method}-{param_hash}" desc = f"ica{param_hash}" else: folder_name = f"ica{self.method}" - desc = f"ica" + desc = "ica" # For derivatives, you can put them in a derivatives folder: bids_path.root = bids_path.root / ".." / "derivatives" / folder_name + # Keep the same base entities: bids_path.update( - suffix="eeg", # override or confirm suffix + suffix="eeg", extension=".fif", - description=desc, # <-- This sets a desc=ica entity - check=True, # If you do not want BIDSPath to fail on derivative checks + description=desc, + check=True, ) + # Make sure the folder is created bids_path.fpath.parent.mkdir(parents=True, exist_ok=True) @@ -156,7 +198,7 @@ def get_ica_path(self, raw_path: Union[str, Path]) -> tuple[Path, Path]: return ica_path, metadata_path - def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): + def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path, raw: mne.io.RawArray): """Save ICA solution and metadata to disk. Arguments @@ -167,31 +209,35 @@ def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path): Path where to save the ICA solution. metadata_path : Path Path where to save the metadata JSON. + raw : mne.io.RawArray + The raw EEG data used for ICA. Returns ------- None - """ + """ # Save ICA solution ica.save(ica_path, overwrite=True) - # Save metadata + # Save metadata including data parameters + metadata = self.get_ica_metadata(raw) with metadata_path.open("w") as f: - json.dump(self.get_ica_metadata(), f) + json.dump(metadata, f, indent=2) - def check_ica_metadata(self, metadata_path: Path) -> bool: + def check_ica_metadata(self, raw: mne.io.RawArray, metadata_path: Path) -> bool: """Check if existing ICA metadata matches current parameters. - Arguments + Arguments --------- + raw : mne.io.RawArray + The raw EEG data to check against. metadata_path : Path - Path to the metadata JSON file to check. + Path to the metadata JSON file. Returns ------- bool - True if metadata exists and matches current parameters, - False otherwise. + True if metadata exists and matches both data and ICA parameters. """ if not metadata_path.exists(): return False @@ -199,8 +245,17 @@ def check_ica_metadata(self, metadata_path: Path) -> bool: with metadata_path.open() as f: saved_metadata = json.load(f) - current_metadata = self.get_ica_metadata() - return saved_metadata == current_metadata + # Check data parameters + current_data_params = self._get_data_params(raw) + if saved_metadata['data_params'] != current_data_params: + return False + + # Check ICA parameters + current_ica_params = self._get_ica_params() + if saved_metadata['ica_params'] != current_ica_params: + return False + + return True def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: """Compute ICA solution and save to disk. @@ -265,13 +320,13 @@ def dynamic_item(self): def process(raw: mne.io.RawArray, fpath: Union[str, Path]): """Process raw data with ICA, computing or loading from cache.""" - ica_path, metadata_path = self.get_ica_path(fpath) + ica_path, metadata_path = self.get_ica_path(raw, fpath) - if ica_path.exists() and self.check_ica_metadata(metadata_path): + if ica_path.exists() and self.check_ica_metadata(raw, metadata_path): ica = mne.preprocessing.read_ica(ica_path, verbose="ERROR") else: ica = self.compute_ica(raw, ica_path) - self.save_ica(ica, ica_path, metadata_path) + self.save_ica(ica, ica_path, metadata_path, raw) # Create a copy of the raw data before applying ICA raw_ica = raw.copy() From 265212831e1aa048640ae52a4fd680d174f78b85 Mon Sep 17 00:00:00 2001 From: vmcru Date: Thu, 27 Mar 2025 22:45:12 -0400 Subject: [PATCH 29/36] precommit fixes --- benchmarks/MOABB/dataio/ica.py | 88 +++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 13f31ca98..f40eb2a6e 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -45,7 +45,7 @@ class ICAProcessor: ... fit_params={"max_iter": 500} ... ) >>> # Use in a SpeechBrain pipeline - >>> # Dynammic item to be used in pipeline: ica_processor.dynamic_item + >>> # Dynammic item to be used in pipeline: ica_processor.dynamic_item """ def __init__( @@ -64,7 +64,6 @@ def __init__( self.filter_params = filter_params or {"l_freq": 1.0, "h_freq": None} self.use_hash = use_hash - def _get_data_params(self, raw: mne.io.RawArray) -> Dict: """Extract relevant parameters from raw.info. @@ -79,10 +78,10 @@ def _get_data_params(self, raw: mne.io.RawArray) -> Dict: Dictionary containing relevant data parameters. """ return { - 'highpass': raw.info['highpass'], - 'lowpass': raw.info['lowpass'], - 'sfreq': raw.info['sfreq'], - 'n_channels': len(raw.info['ch_names']), + "highpass": raw.info["highpass"], + "lowpass": raw.info["lowpass"], + "sfreq": raw.info["sfreq"], + "n_channels": len(raw.info["ch_names"]), } def _get_ica_params(self) -> Dict: @@ -94,11 +93,11 @@ def _get_ica_params(self) -> Dict: Dictionary containing ICA processing parameters. """ return { - 'n_components': self.n_components, - 'method': self.method, - 'random_state': self.random_state, - 'fit_params': self.fit_params, - 'filter_params': self.filter_params, + "n_components": self.n_components, + "method": self.method, + "random_state": self.random_state, + "fit_params": self.fit_params, + "filter_params": self.filter_params, } def _get_params_hash(self, raw: mne.io.RawArray) -> str: @@ -116,17 +115,17 @@ def _get_params_hash(self, raw: mne.io.RawArray) -> str: """ # Only include parameters that affect the ICA computation hash_params = { - 'data_params': { - 'highpass': raw.info['highpass'], - 'lowpass': raw.info['lowpass'], - 'sfreq': raw.info['sfreq'], - 'n_channels': len(raw.info['ch_names']) + "data_params": { + "highpass": raw.info["highpass"], + "lowpass": raw.info["lowpass"], + "sfreq": raw.info["sfreq"], + "n_channels": len(raw.info["ch_names"]), + }, + "ica_params": { + "n_components": self.n_components, + "method": self.method, + "filter_params": self.filter_params, }, - 'ica_params': { - 'n_components': self.n_components, - 'method': self.method, - 'filter_params': self.filter_params - } } param_str = json.dumps(hash_params, sort_keys=True) return hashlib.md5(param_str.encode()).hexdigest()[:8] @@ -145,15 +144,19 @@ def get_ica_metadata(self, raw: mne.io.RawArray) -> Dict: Complete metadata dictionary. """ return { - 'data_params': self._get_data_params(raw), - 'ica_params': self._get_ica_params(), - 'metadata': { - 'creation_date': datetime.now().isoformat(), - 'raw_filename': str(raw.filenames[0]) if raw.filenames else None - } + "data_params": self._get_data_params(raw), + "ica_params": self._get_ica_params(), + "metadata": { + "creation_date": datetime.now().isoformat(), + "raw_filename": str(raw.filenames[0]) + if raw.filenames + else None, + }, } - def get_ica_path(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> tuple[Path, Path]: + def get_ica_path( + self, raw: mne.io.RawArray, raw_path: Union[str, Path] + ) -> tuple[Path, Path]: """Generate path where ICA solution should be stored. Arguments @@ -181,15 +184,12 @@ def get_ica_path(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> tupl # For derivatives, you can put them in a derivatives folder: bids_path.root = bids_path.root / ".." / "derivatives" / folder_name - + # Keep the same base entities: bids_path.update( - suffix="eeg", - extension=".fif", - description=desc, - check=True, + suffix="eeg", extension=".fif", description=desc, check=True, ) - + # Make sure the folder is created bids_path.fpath.parent.mkdir(parents=True, exist_ok=True) @@ -198,7 +198,13 @@ def get_ica_path(self, raw: mne.io.RawArray, raw_path: Union[str, Path]) -> tupl return ica_path, metadata_path - def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path, raw: mne.io.RawArray): + def save_ica( + self, + ica: ICA, + ica_path: Path, + metadata_path: Path, + raw: mne.io.RawArray, + ): """Save ICA solution and metadata to disk. Arguments @@ -224,7 +230,9 @@ def save_ica(self, ica: ICA, ica_path: Path, metadata_path: Path, raw: mne.io.Ra with metadata_path.open("w") as f: json.dump(metadata, f, indent=2) - def check_ica_metadata(self, raw: mne.io.RawArray, metadata_path: Path) -> bool: + def check_ica_metadata( + self, raw: mne.io.RawArray, metadata_path: Path + ) -> bool: """Check if existing ICA metadata matches current parameters. Arguments @@ -247,12 +255,12 @@ def check_ica_metadata(self, raw: mne.io.RawArray, metadata_path: Path) -> bool: # Check data parameters current_data_params = self._get_data_params(raw) - if saved_metadata['data_params'] != current_data_params: + if saved_metadata["data_params"] != current_data_params: return False # Check ICA parameters current_ica_params = self._get_ica_params() - if saved_metadata['ica_params'] != current_ica_params: + if saved_metadata["ica_params"] != current_ica_params: return False return True @@ -322,7 +330,9 @@ def process(raw: mne.io.RawArray, fpath: Union[str, Path]): ica_path, metadata_path = self.get_ica_path(raw, fpath) - if ica_path.exists() and self.check_ica_metadata(raw, metadata_path): + if ica_path.exists() and self.check_ica_metadata( + raw, metadata_path + ): ica = mne.preprocessing.read_ica(ica_path, verbose="ERROR") else: ica = self.compute_ica(raw, ica_path) From f2263e4f8c935b74efb4dac69fb71eabfbe58e22 Mon Sep 17 00:00:00 2001 From: vmcru Date: Fri, 28 Mar 2025 03:14:28 -0400 Subject: [PATCH 30/36] updates to the test files and minor tqeat to ica parameters. --- .github/workflows/tests.yml | 2 +- benchmarks/MOABB/dataio/ica.py | 4 +- tests/benchmarks_utils/test_ica.py | 127 +++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+), 3 deletions(-) create mode 100644 tests/benchmarks_utils/test_ica.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 275b626b0..c771b1a58 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -41,4 +41,4 @@ jobs: run: python -c "import sys; print(sys.version)" - name: Consistency tests with pytest run: | - pytest tests \ No newline at end of file + pytest tests diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index f40eb2a6e..e51e1202e 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -60,7 +60,7 @@ def __init__( self.n_components = n_components self.method = method self.random_state = random_state - self.fit_params = fit_params or {} + self._fit_params = fit_params or {} self.filter_params = filter_params or {"l_freq": 1.0, "h_freq": None} self.use_hash = use_hash @@ -296,7 +296,7 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: n_components=self.n_components, method=self.method, random_state=self.random_state, - **self.fit_params, + **self._fit_params, ) ica.fit(raw_filtered) ica.save(ica_path) diff --git a/tests/benchmarks_utils/test_ica.py b/tests/benchmarks_utils/test_ica.py new file mode 100644 index 000000000..d0f91f0b8 --- /dev/null +++ b/tests/benchmarks_utils/test_ica.py @@ -0,0 +1,127 @@ +"""Test module for ICA processing benchmarks. + +Authors +------- +Victor Cruz, 2025 +""" +import pytest +import time +import mne +import numpy as np +from moabb.datasets import FakeDataset + +from dataio.datasets import EpochedEEGDataset +from dataio.ica import ICAProcessor + + +@pytest.fixture +def dummy_ica_dataset(tmp_path): + """Create a dummy dataset for testing ICA processing.""" + fake_dataset_folder = tmp_path / "MNE-BIDS-Fake" + + if not fake_dataset_folder.exists(): + fake_dataset_folder.mkdir(parents=True) + + dataset = EpochedEEGDataset.from_moabb( + FakeDataset(n_sessions=2, n_runs=2, n_subjects=2, paradigm="imagery"), + fake_dataset_folder / "MNE-BIDS-Fake.json", + save_path=tmp_path, + tmin=0, + tmax=4.0, + output_keys=["label", "subject", "session", "epoch"], + ) + return dataset + + +def test_ica_processor_creation(): + """Test ICA processor initialization.""" + ica_processor = ICAProcessor( + n_components=15, + method="picard", + fit_params={"max_iter": 500}, + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + assert ica_processor.n_components == 15 + assert ica_processor.method == "picard" + assert ica_processor._fit_params == {"max_iter": 500} + + +def test_ica_caching(dummy_ica_dataset): + """Test ICA caching functionality.""" + ica_processor = ICAProcessor( + n_components=15, + method="picard", + fit_params={"max_iter": 500}, + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + + # Add ICA processor to dataset + dataset = dummy_ica_dataset + dataset.add_dynamic_item(ica_processor.dynamic_item) + + # First run - should compute ICA + start = time.time() + for _ in dataset: + pass + computation_time = time.time() - start + + # Second run - should use cache + start = time.time() + for _ in dataset: + pass + cached_time = time.time() - start + + # Cache should be faster + assert cached_time < computation_time + + +def test_ica_hash_consistency(): + """Test that ICA hash is consistent for same parameters.""" + ica_processor1 = ICAProcessor( + n_components=15, + method="picard", + fit_params={"max_iter": 500}, + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + + ica_processor2 = ICAProcessor( + n_components=15, + method="picard", + fit_params={"max_iter": 500}, + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + + # Create dummy raw data + data = np.random.randn(2, 1000) + info = mne.create_info(ch_names=["EEG1", "EEG2"], sfreq=100, ch_types="eeg") + raw = mne.io.RawArray(data, info) + + hash1 = ica_processor1._get_params_hash(raw) + hash2 = ica_processor2._get_params_hash(raw) + + assert hash1 == hash2 + + +def test_different_parameters_different_hash(): + """Test that different ICA parameters produce different hashes.""" + ica_processor1 = ICAProcessor( + n_components=15, + method="picard", + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + + ica_processor2 = ICAProcessor( + n_components=20, # Different number of components + method="picard", + filter_params={"l_freq": 1.0, "h_freq": None}, + ) + + # Create dummy raw data + data = np.random.randn(2, 1000) + info = mne.create_info(ch_names=["EEG1", "EEG2"], sfreq=100, ch_types="eeg") + raw = mne.io.RawArray(data, info) + + hash1 = ica_processor1._get_params_hash(raw) + hash2 = ica_processor2._get_params_hash(raw) + + assert hash1 != hash2 From 210dd9e23c6d1b779252ff85d86075d526b401a4 Mon Sep 17 00:00:00 2001 From: vmcru Date: Fri, 28 Mar 2025 16:50:40 -0400 Subject: [PATCH 31/36] adapted hashing for consistency and reproducibility. removed optionality of hashing inclusion. --- benchmarks/MOABB/dataio/ica.py | 101 ++++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 28 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index e51e1202e..98835b392 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -55,17 +55,61 @@ def __init__( random_state=42, fit_params: Optional[Dict[str, Any]] = None, filter_params: Optional[Dict[str, Any]] = None, - use_hash: bool = True, ): self.n_components = n_components self.method = method self.random_state = random_state self._fit_params = fit_params or {} self.filter_params = filter_params or {"l_freq": 1.0, "h_freq": None} - self.use_hash = use_hash + + def _get_effective_filter_params(self, raw: mne.io.RawArray) -> Dict: + """Determine effective filtering parameters considering both data and processing. + + Arguments + --------- + raw : mne.io.RawArray + The raw EEG data. + + Returns + ------- + dict + Effective filter parameters considering both intrinsic and applied filters. + """ + # Get the intrinsic highpass from the data + data_highpass = raw.info["highpass"] + + # Determine effective highpass + if self.filter_params and "l_freq" in self.filter_params: + # If we're applying additional filtering, effective highpass is the higher value + effective_highpass = max( + data_highpass, self.filter_params["l_freq"] + ) + else: + effective_highpass = data_highpass + + # Similarly for lowpass + data_lowpass = raw.info["lowpass"] + if self.filter_params and "h_freq" in self.filter_params: + # For lowpass, take the lower value if we're applying additional filtering + effective_lowpass = ( + min(data_lowpass, self.filter_params["h_freq"]) + if self.filter_params["h_freq"] + else data_lowpass + ) + else: + effective_lowpass = data_lowpass + + return { + "effective_highpass": effective_highpass, + "effective_lowpass": effective_lowpass, + "original_data_highpass": data_highpass, + "original_data_lowpass": data_lowpass, + "additional_filtering": bool(self.filter_params), + "filter_params": self.filter_params, + } def _get_data_params(self, raw: mne.io.RawArray) -> Dict: - """Extract relevant parameters from raw.info. + """Extract relevant parameters from raw.info and processing. Arguments --------- @@ -77,11 +121,14 @@ def _get_data_params(self, raw: mne.io.RawArray) -> Dict: dict Dictionary containing relevant data parameters. """ + filter_info = self._get_effective_filter_params(raw) + return { - "highpass": raw.info["highpass"], - "lowpass": raw.info["lowpass"], + "effective_highpass": filter_info["effective_highpass"], + "effective_lowpass": filter_info["effective_lowpass"], "sfreq": raw.info["sfreq"], "n_channels": len(raw.info["ch_names"]), + "filtering_applied": filter_info["additional_filtering"], } def _get_ica_params(self) -> Dict: @@ -96,12 +143,12 @@ def _get_ica_params(self) -> Dict: "n_components": self.n_components, "method": self.method, "random_state": self.random_state, - "fit_params": self.fit_params, + "fit_params": self._fit_params, "filter_params": self.filter_params, } def _get_params_hash(self, raw: mne.io.RawArray) -> str: - """Generate hash based on both data and ICA parameters. + """Generate hash based on effective parameters. Arguments --------- @@ -113,25 +160,28 @@ def _get_params_hash(self, raw: mne.io.RawArray) -> str: str 8-character hexadecimal hash of the parameters. """ - # Only include parameters that affect the ICA computation + filter_info = self._get_effective_filter_params(raw) + hash_params = { "data_params": { - "highpass": raw.info["highpass"], - "lowpass": raw.info["lowpass"], + "effective_highpass": filter_info["effective_highpass"], + "effective_lowpass": filter_info["effective_lowpass"], "sfreq": raw.info["sfreq"], "n_channels": len(raw.info["ch_names"]), }, "ica_params": { "n_components": self.n_components, "method": self.method, - "filter_params": self.filter_params, + "random_state": self.random_state, + "fit_params": self._fit_params, }, + "filter_params": filter_info["filter_params"], } param_str = json.dumps(hash_params, sort_keys=True) return hashlib.md5(param_str.encode()).hexdigest()[:8] def get_ica_metadata(self, raw: mne.io.RawArray) -> Dict: - """Generate complete metadata including both data and ICA parameters. + """Generate complete metadata including effective parameters. Arguments --------- @@ -143,9 +193,12 @@ def get_ica_metadata(self, raw: mne.io.RawArray) -> Dict: dict Complete metadata dictionary. """ + filter_info = self._get_effective_filter_params(raw) + return { "data_params": self._get_data_params(raw), "ica_params": self._get_ica_params(), + "filter_info": filter_info, "metadata": { "creation_date": datetime.now().isoformat(), "raw_filename": str(raw.filenames[0]) @@ -174,13 +227,9 @@ def get_ica_path( """ bids_path = get_bids_path_from_fname(raw_path) - if self.use_hash: - param_hash = self._get_params_hash(raw) - folder_name = f"ica-{self.method}-{param_hash}" - desc = f"ica{param_hash}" - else: - folder_name = f"ica{self.method}" - desc = "ica" + param_hash = self._get_params_hash(raw) + folder_name = f"ica-{self.method}-{param_hash}" + desc = f"ica{param_hash}" # For derivatives, you can put them in a derivatives folder: bids_path.root = bids_path.root / ".." / "derivatives" / folder_name @@ -266,11 +315,7 @@ def check_ica_metadata( return True def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: - """Compute ICA solution and save to disk. - - If filter_params is provided, applies a high-pass filter before ICA computation. - This step can be skipped if the data is already filtered by setting - filter_params to None during ICAProcessor initialization. + """Compute ICA solution considering effective filtering. Arguments --------- @@ -284,12 +329,13 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: mne.preprocessing.ICA The computed ICA solution. """ - if self.filter_params is not None: - # Apply high-pass filter only if filter parameters are provided + filter_info = self._get_effective_filter_params(raw) + + # Only apply additional filtering if needed + if filter_info["additional_filtering"]: raw_filtered = raw.copy() raw_filtered.filter(**self.filter_params) else: - # Use raw data directly if no filtering is needed raw_filtered = raw ica = ICA( @@ -299,7 +345,6 @@ def compute_ica(self, raw: mne.io.RawArray, ica_path: Path) -> ICA: **self._fit_params, ) ica.fit(raw_filtered) - ica.save(ica_path) return ica @property From d787c0bc855762b27f67f5880d521551f1dcb251 Mon Sep 17 00:00:00 2001 From: vmcru Date: Fri, 28 Mar 2025 16:50:52 -0400 Subject: [PATCH 32/36] shpeechbrain changes. --- speechbrain | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speechbrain b/speechbrain index 093c105d4..e602161f4 160000 --- a/speechbrain +++ b/speechbrain @@ -1 +1 @@ -Subproject commit 093c105d405d5ca1537663f516fd587485201420 +Subproject commit e602161f4d305e13a26fc71b7dbe4a4cfeaa8847 From 4b0350112aab80f70bdf046623f66e0075225e42 Mon Sep 17 00:00:00 2001 From: vmcru Date: Fri, 28 Mar 2025 16:52:06 -0400 Subject: [PATCH 33/36] removed validate_ica.py from tracked files --- benchmarks/MOABB/validate_ica.py | 262 ------------------------------- 1 file changed, 262 deletions(-) delete mode 100644 benchmarks/MOABB/validate_ica.py diff --git a/benchmarks/MOABB/validate_ica.py b/benchmarks/MOABB/validate_ica.py deleted file mode 100644 index 665eedaa2..000000000 --- a/benchmarks/MOABB/validate_ica.py +++ /dev/null @@ -1,262 +0,0 @@ -"""File for testing ICA computation and application for EEG data. -Authors -------- -Victor Cruz, 2025 -""" -import time -import mne -import moabb -import logging -from pathlib import Path -from datetime import datetime -from moabb.datasets import BNCI2014_001 -from memory_profiler import profile - -from dataio.datasets import EpochedEEGDataset, InMemoryDataset -from dataio.ica import ICAProcessor - -# Set up logging -mne.set_log_level(verbose=False) -moabb.set_log_level(level="ERROR") - - -# Configure logging -def setup_logging(): - """Set up logging to both file and console. - - The logs are written to a file in the 'logs' directory, with a timestamp - in the filename. The logs are also printed to the console. - - Returns - ------- - logging.Logger - The configured logger instance. - """ - - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - log_dir = Path("logs") - log_dir.mkdir(exist_ok=True) - log_file = log_dir / f"ica_benchmark_{timestamp}.log" - - # Configure logging format - formatter = logging.Formatter("%(asctime)s - %(message)s") - - # File handler - file_handler = logging.FileHandler(log_file) - file_handler.setFormatter(formatter) - - # Console handler - console_handler = logging.StreamHandler() - console_handler.setFormatter(formatter) - - # Set up logger - logger = logging.getLogger("ICA_benchmark") - logger.setLevel(logging.INFO) - logger.addHandler(file_handler) - logger.addHandler(console_handler) - - return logger - - -logger = setup_logging() - - -def test_ica_method( - method: str, n_components: int = 15, use_hash: bool = True, **kwargs -): - """Test a specific ICA method and return timing results. - - This function creates an ICAProcessor, runs the EpochedEEGDataset with the - processor, and measures the time taken for various steps, including initial - ICA computation, caching, and in-memory caching. - - Arguments - --------- - method : str - The ICA method to test, either 'picard' or 'infomax'. - n_components : int, optional - The number of ICA components to use, by default 15. - use_hash : bool, optional - Whether to use parameter hashing for caching, by default True. - **kwargs - Additional parameters to pass to the ICAProcessor constructor. - - Returns - ------- - dict - A dictionary containing the timing results for the tested ICA method. - """ - logger.info(f"\nTesting ICA method: {method} (use_hash={use_hash})") - - start = time.time() - ica_processor = ICAProcessor( - n_components=n_components, method=method, use_hash=use_hash, **kwargs - ) - time_init = time.time() - start - logger.info(f"Time to create processor: {time_init:.4f}s") - - start = time.time() - dataset = EpochedEEGDataset.from_moabb( - BNCI2014_001(), - f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", - save_path="data", - tmin=0, - tmax=4.0, - preload=True, - output_keys=["label", "subject", "session", "epoch"], - dynamic_items=[ica_processor.dynamic_item], - ) - time_create = time.time() - start - logger.info(f"Time to create dataset: {time_create:.2f}s") - - # First run - ICA computation - logger.info("First run (computing ICA):") - start = time.time() - for _ in dataset: - pass - computation_time = time.time() - start - logger.info(f"Time with {method} ICA (first run): {computation_time:.2f}s") - - # Second run - using cached ICA - logger.info("\nSecond run (using cached ICA):") - start = time.time() - for _ in dataset: - pass - cached_time = time.time() - start - logger.info(f"Time with {method} ICA (cached): {cached_time:.2f}s") - - # Memory-cached version - logger.info("\nTesting with InMemoryDataset wrapper:") - dataset_cached = InMemoryDataset(dataset) - start = time.time() - for _ in dataset_cached: - pass - memory_cached_time = time.time() - start - logger.info( - f"Time with {method} ICA (in-memory cache): {memory_cached_time:.2f}s" - ) - - return { - "method": method, - "use_hash": use_hash, - "init_time": time_init, - "create_time": time_create, - "computation_time": computation_time, - "cached_time": cached_time, - "memory_cached_time": memory_cached_time, - } - - -def compare_ica_methods(): - """Compare the performance of different ICA methods. - - This function tests the Picard and Infomax ICA methods, both with and without - parameter hashing for caching. It also tests the baseline performance without - any ICA processing. The results are logged to the console and the log file. - """ - # Test without ICA first as baseline - logger.info("\nTesting without ICA (baseline):") - dataset_no_ica = EpochedEEGDataset.from_moabb( - BNCI2014_001(), - "data/MNE-BIDS-bnci2014-001-epoched.json", - save_path="data", - tmin=0, - tmax=4.0, - output_keys=["label", "subject", "session", "epoch"], - ) - - start = time.time() - for _ in dataset_no_ica: - pass - baseline_time = time.time() - start - logger.info(f"Time without ICA: {baseline_time:.2f}s") - - # Test different ICA methods - results = [] - - # Test Picard with and without hash - for use_hash in [True, False]: - results.append( - test_ica_method( - "picard", - n_components=15, - use_hash=use_hash, - fit_params={"max_iter": 500}, - filter_params={"l_freq": 1.0, "h_freq": None}, - ) - ) - - # Test Infomax with and without hash - for use_hash in [True, False]: - results.append( - test_ica_method( - "infomax", - n_components=15, - use_hash=use_hash, - fit_params={"max_iter": 1000}, - filter_params={"l_freq": 1.0, "h_freq": None}, - ) - ) - - # Print comparison - logger.info("\nComparison Summary:") - logger.info("-" * 70) - logger.info(f"Baseline (no ICA): {baseline_time:.2f}s") - logger.info("-" * 70) - for result in results: - logger.info( - f"Method: {result['method']} (use_hash={result['use_hash']})" - ) - logger.info(f" Initialization time: {result['init_time']:.4f}s") - logger.info(f" Dataset creation time: {result['create_time']:.2f}s") - logger.info(f" Computation time: {result['computation_time']:.2f}s") - logger.info(f" Cached access time: {result['cached_time']:.2f}s") - logger.info( - f" In-memory cached time: {result['memory_cached_time']:.2f}s" - ) - logger.info("-" * 70) - - -@profile -def profile_memory_usage(): - """Profile the memory usage of ICA processing. - - This function runs the ICA processing for both Picard and Infomax methods, - with and without parameter hashing, and profiles the memory usage. - """ - # Profile memory usage for both methods with and without hash - for method in ["picard", "infomax"]: - for use_hash in [True, False]: - logger.info(f"\nProfiling {method} ICA (use_hash={use_hash}):") - ica_processor = ICAProcessor( - n_components=15, - method=method, - use_hash=use_hash, - fit_params={"max_iter": 500 if method == "picard" else 1000}, - filter_params={"l_freq": 1.0, "h_freq": None}, - ) - dataset = EpochedEEGDataset.from_moabb( - BNCI2014_001(), - f"data/MNE-BIDS-bnci2014-001-epoched-{method}.json", - save_path="data", - tmin=0, - tmax=4.0, - preload=True, - output_keys=["label", "subject", "session", "epoch"], - dynamic_items=[ica_processor.dynamic_item], - ) - - for _ in dataset: - pass - - -if __name__ == "__main__": - """Entry point for the ICA benchmark script. - - Runs the ICA method comparison and the memory usage profiling. - """ - logger.info("Running ICA method comparison...") - compare_ica_methods() - - logger.info("\nRunning memory profile...") - profile_memory_usage() From 06f873aa97c8c2a223bee2f72458107009072fdb Mon Sep 17 00:00:00 2001 From: vmcru Date: Tue, 1 Apr 2025 12:06:33 -0400 Subject: [PATCH 34/36] changed folder from derivaties to processor --- benchmarks/MOABB/dataio/ica.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmarks/MOABB/dataio/ica.py b/benchmarks/MOABB/dataio/ica.py index 98835b392..e0b121064 100644 --- a/benchmarks/MOABB/dataio/ica.py +++ b/benchmarks/MOABB/dataio/ica.py @@ -231,8 +231,8 @@ def get_ica_path( folder_name = f"ica-{self.method}-{param_hash}" desc = f"ica{param_hash}" - # For derivatives, you can put them in a derivatives folder: - bids_path.root = bids_path.root / ".." / "derivatives" / folder_name + # For processors, you can put them in a processors folder: + bids_path.root = bids_path.root / ".." / "processors" / folder_name # Keep the same base entities: bids_path.update( From ead65b3a29c4130f43210baa64ae82fef77db954 Mon Sep 17 00:00:00 2001 From: vmcru Date: Tue, 1 Apr 2025 12:10:36 -0400 Subject: [PATCH 35/36] precommit action error fix --- .github/workflows/pre-commit.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 6724b2764..312691205 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -9,8 +9,8 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 + - uses: actions/checkout + - uses: actions/setup-python with: - python-version: '3.8' - - uses: pre-commit/action@v2.0.3 + python-version: '3.11' + - uses: pre-commit/action From 82cdd903f029cd814373405cd7c2de1765c04fe7 Mon Sep 17 00:00:00 2001 From: vmcru Date: Tue, 1 Apr 2025 12:13:15 -0400 Subject: [PATCH 36/36] precommit action error fix 2 --- .github/workflows/pre-commit.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 312691205..845672314 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -9,8 +9,8 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout - - uses: actions/setup-python + - uses: actions/checkout@v4.2.2 + - uses: actions/setup-python@v5.5.0 with: python-version: '3.11' - - uses: pre-commit/action + - uses: pre-commit/action@v3.0.1