diff --git a/.gitignore b/.gitignore index a790fe1..ffc51a0 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,13 @@ celerybeat.pid *.h5 # MyST build outputs _build + +# PyTorch Lightning logs and checkpoints +lightning_logs/ +saved_models/ +.pt_tmp/ +*.ckpt +*.yaml + +# MDN model cache +microimpute_models/ diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..17b4e47 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,4 @@ +- bump: minor + changes: + added: + - Mixture Density Network (MDN) model for numeric variable imputation and Neural Classifier model for categorical variable imputation. diff --git a/microimpute/comparisons/autoimpute.py b/microimpute/comparisons/autoimpute.py index cba83ce..c561542 100644 --- a/microimpute/comparisons/autoimpute.py +++ b/microimpute/comparisons/autoimpute.py @@ -37,14 +37,23 @@ except ImportError: HAS_MATCHING = False + +try: + from microimpute.models import MDN + + HAS_MDN = True +except ImportError: + HAS_MDN = False + log = logging.getLogger(__name__) # Internal constants for model compatibility with variable types -_NUMERICAL_MODELS = {"OLS", "QRF", "QuantReg", "Matching"} +_NUMERICAL_MODELS = {"OLS", "QRF", "QuantReg", "Matching", "MDN"} _CATEGORICAL_MODELS = { "OLS", "QRF", "Matching", + "MDN", } # QuantReg doesn't support categorical @@ -208,7 +217,7 @@ def _evaluate_models_parallel( if len(result) == 3: model_name, cv_result, best_params = result method_results[model_name] = cv_result - if model_name in ["QRF", "Matching"]: + if model_name in ["QRF", "Matching", "MDN"]: best_hyperparams[model_name] = best_params else: model_name, cv_result = result @@ -327,6 +336,7 @@ def autoimpute( random_state: Optional[int] = RANDOM_STATE, train_size: Optional[float] = TRAIN_SIZE, k_folds: Optional[int] = 5, + force_retrain: Optional[bool] = False, log_level: Optional[str] = "WARNING", ) -> AutoImputeResult: """Automatically select and apply the best imputation model. @@ -364,6 +374,8 @@ def autoimpute( random_state : Random seed for reproducibility train_size : Proportion of data to use for training in preprocessing k_folds : Number of folds for cross-validation. Defaults to 5. + force_retrain : If True, forces MDN models to retrain instead of using + cached models. Defaults to False. log_level : Logging level for the function. Defaults to "WARNING". Returns: @@ -440,9 +452,19 @@ def autoimpute( model_classes: List[Type[Imputer]] = [QRF, OLS, QuantReg] if HAS_MATCHING: model_classes.append(Matching) + if HAS_MDN: + model_classes.append(MDN) else: model_classes = models + # Inject force_retrain for MDN if it's in the model list + if force_retrain and any(m.__name__ == "MDN" for m in model_classes): + if hyperparameters is None: + hyperparameters = {} + if "MDN" not in hyperparameters: + hyperparameters["MDN"] = {} + hyperparameters["MDN"]["force_retrain"] = True + # Log hyperparameter usage if hyperparameters: model_names = [ diff --git a/microimpute/comparisons/autoimpute_helpers.py b/microimpute/comparisons/autoimpute_helpers.py index 154234e..1df9307 100644 --- a/microimpute/comparisons/autoimpute_helpers.py +++ b/microimpute/comparisons/autoimpute_helpers.py @@ -291,7 +291,7 @@ def fit_and_predict_model( weight_col=weight_col, quantiles=[quantile], ) - elif hyperparams and model_name in ["Matching", "QRF"]: + elif hyperparams and model_name in ["Matching", "QRF", "MDN"]: # Apply hyperparameters for specific models fitted_model = model.fit( training_data, diff --git a/microimpute/config.py b/microimpute/config.py index e29e69c..bcd51b3 100644 --- a/microimpute/config.py +++ b/microimpute/config.py @@ -39,12 +39,45 @@ # Random state for reproducibility RANDOM_STATE: int = 42 -# Model parameters +# Model parameters (passed via **kwargs to fit() or as __init__ params) DEFAULT_MODEL_PARAMS: Dict[str, Dict[str, Any]] = { - "qrf": {}, - "quantreg": {}, - "ols": {}, - "matching": {}, + "qrf": { + # RandomForestQuantileRegressor parameters + "n_estimators": 100, + "max_depth": None, + "min_samples_split": 2, + "min_samples_leaf": 1, + "max_features": 1.0, + }, + "quantreg": { + # statsmodels QuantReg uses default parameters + }, + "ols": { + # statsmodels OLS uses default parameters + # LogisticRegression params for categorical targets: + "penalty": "l2", + "C": 1.0, + "max_iter": 1000, + }, + "matching": { + # StatMatch NND hotdeck default parameters + }, + "mdn": { + # Backbone network parameters + "layers": "128-64-32", + "activation": "ReLU", + "dropout": 0.0, + "use_batch_norm": False, + # MDN head parameters + "num_gaussian": 5, + "softmax_temperature": 1.0, + "n_samples": 100, + # Training parameters + "learning_rate": 1e-3, + "max_epochs": 100, + "early_stopping_patience": 10, + "batch_size": 256, + }, } # Plotting configuration diff --git a/microimpute/evaluations/cross_validation.py b/microimpute/evaluations/cross_validation.py index db38502..cbf6e9e 100644 --- a/microimpute/evaluations/cross_validation.py +++ b/microimpute/evaluations/cross_validation.py @@ -206,8 +206,8 @@ def _fit_model_for_fold( return None, None raise e - # Handle hyperparameter tuning for QRF and Matching - elif tune_hyperparameters and model_name in ["QRF", "Matching"]: + # Handle hyperparameter tuning for QRF, Matching, and MDN + elif tune_hyperparameters and model_name in ["QRF", "Matching", "MDN"]: log.info(f"Tuning {model_name} hyperparameters during fitting") fitted_model, fold_tuned_params = model.fit( train_data, diff --git a/microimpute/models/__init__.py b/microimpute/models/__init__.py index eb0d105..b243aa6 100644 --- a/microimpute/models/__init__.py +++ b/microimpute/models/__init__.py @@ -9,6 +9,8 @@ - QRF: quantile random forest for non-parametric quantile regression - QuantReg: linear quantile regression model - Matching: statistical matching/hot-deck imputation (optional, requires rpy2) + - MDN: Mixture Density Network for probabilistic imputation + (optional, requires pytorch-tabular) Base classes: - Imputer: abstract base class for all imputation models @@ -23,6 +25,11 @@ except ImportError: pass +try: + from microimpute.models.mdn import MDN +except ImportError: + pass + # Import specific model implementations from microimpute.models.ols import OLS from microimpute.models.qrf import QRF diff --git a/microimpute/models/mdn.py b/microimpute/models/mdn.py new file mode 100644 index 0000000..d2fa6d4 --- /dev/null +++ b/microimpute/models/mdn.py @@ -0,0 +1,1355 @@ +"""Mixture Density Network (MDN) imputation model using PyTorch Tabular.""" + +import hashlib +import os +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +import numpy as np +import pandas as pd +import torch +from pydantic import validate_call + +from microimpute.config import RANDOM_STATE, VALIDATE_CONFIG +from microimpute.models.imputer import ( + Imputer, + ImputerResults, + _ConstantValueModel, +) + +# PyTorch Tabular imports +try: + from pytorch_tabular import TabularModel + from pytorch_tabular.config import ( + DataConfig, + OptimizerConfig, + TrainerConfig, + ) + from pytorch_tabular.models import CategoryEmbeddingModelConfig, MDNConfig + + PYTORCH_TABULAR_AVAILABLE = True +except ImportError: + PYTORCH_TABULAR_AVAILABLE = False + + +def _generate_data_hash(X: pd.DataFrame, y: pd.Series) -> str: + """Generate a hash from the training data for cache identification. + + Creates a reproducible hash based on the data shape, column names, + and a sample of the data values. + + Args: + X: Feature DataFrame. + y: Target Series. + + Returns: + A short hash string identifying the dataset. + """ + # Include shape, column names, and data statistics for identification + hash_components = [ + str(X.shape), + str(sorted(X.columns.tolist())), + str(y.name), + str(len(y)), + ] + + # Add hash of actual data values for uniqueness + # Use pandas hash_pandas_object for consistent hashing + try: + data_hash = pd.util.hash_pandas_object(X).sum() + y_hash = pd.util.hash_pandas_object(y).sum() + hash_components.extend([str(data_hash), str(y_hash)]) + except Exception: + # Fallback to basic stats if hashing fails + hash_components.extend( + [ + str(X.values.mean()) if X.size > 0 else "0", + str(y.mean()) if len(y) > 0 else "0", + ] + ) + + combined = "_".join(hash_components) + return hashlib.md5(combined.encode()).hexdigest()[:12] + + +def _generate_cache_key( + predictors: List[str], target: str, data_hash: str +) -> str: + """Generate a cache key for model storage. + + Args: + predictors: List of predictor column names. + target: Target variable name. + data_hash: Hash of the training data. + + Returns: + A string cache key combining predictor hash, target, and data hash. + """ + # Sort predictors for consistent hashing + sorted_predictors = sorted(predictors) + predictors_str = "_".join(sorted_predictors) + # Create a short hash of the predictors + predictors_hash = hashlib.md5(predictors_str.encode()).hexdigest()[:8] + # Sanitize target name for filesystem + safe_target = target.replace("/", "_").replace("\\", "_") + return f"{predictors_hash}_{safe_target}_{data_hash}" + + +class _MDNModel: + """Internal wrapper for PyTorch Tabular MDN model for numeric targets.""" + + def __init__( + self, + seed: int, + logger, + layers: str = "128-64", + activation: str = "ReLU", + dropout: float = 0.1, + use_batch_norm: bool = False, + num_gaussian: int = 5, + softmax_temperature: float = 1.0, + n_samples: int = 100, + learning_rate: float = 1e-3, + max_epochs: int = 100, + early_stopping_patience: int = 10, + batch_size: int = 64, + ): + self.seed = seed + self.logger = logger + self.layers = layers + self.activation = activation + self.dropout = dropout + self.use_batch_norm = use_batch_norm + self.num_gaussian = num_gaussian + self.softmax_temperature = softmax_temperature + self.n_samples = n_samples + self.learning_rate = learning_rate + self.max_epochs = max_epochs + self.early_stopping_patience = early_stopping_patience + self.batch_size = batch_size + + self.model = None + self.output_column = None + + def fit( + self, + X: pd.DataFrame, + y: pd.Series, + ) -> None: + """Fit the MDN model. + + Args: + X: Feature DataFrame (predictors are already dummy-encoded). + y: Target Series. + """ + self.output_column = y.name + + # Combine X and y for PyTorch Tabular + train_data = X.copy() + train_data[y.name] = y + + # All predictors are continuous (categorical predictors were + # dummy-encoded during preprocessing in the base Imputer class) + continuous_cols = X.columns.tolist() + + # Configure data + data_config = DataConfig( + target=[y.name], + continuous_cols=continuous_cols, + categorical_cols=[], + ) + + # Configure trainer + # Note: checkpoints and load_best disabled to avoid PyTorch 2.6 + # weights_only=True issue with OmegaConf objects + trainer_config = TrainerConfig( + max_epochs=self.max_epochs, + batch_size=self.batch_size, + early_stopping="valid_loss", + early_stopping_patience=self.early_stopping_patience, + checkpoints=None, + load_best=False, + accelerator="auto", + devices=1, + trainer_kwargs={ + "enable_progress_bar": False, + "enable_model_summary": False, + }, + ) + + # Configure optimizer + optimizer_config = OptimizerConfig( + optimizer="Adam", + lr_scheduler=None, + ) + + # Configure MDN model + model_config = MDNConfig( + task="regression", + backbone_config_class="CategoryEmbeddingModelConfig", + backbone_config_params={ + "task": "backbone", # Required: tells it to act as feature extractor + "layers": self.layers, + "activation": self.activation, + "dropout": self.dropout, + "use_batch_norm": self.use_batch_norm, + }, + head_config={ + "num_gaussian": self.num_gaussian, + "softmax_temperature": self.softmax_temperature, + "n_samples": self.n_samples, + }, + learning_rate=self.learning_rate, + seed=self.seed, + ) + + # Create and train model + self.model = TabularModel( + data_config=data_config, + model_config=model_config, + optimizer_config=optimizer_config, + trainer_config=trainer_config, + ) + + self.model.fit(train=train_data) + + def predict(self, X: pd.DataFrame, n_samples: int = 1) -> np.ndarray: + """Sample from the MDN distribution. + + Predictions are made by stochastically sampling from the learned + mixture distribution, not by returning point estimates. + + Args: + X: Feature DataFrame. + n_samples: Number of samples per observation. + + Returns: + Array of shape (n_observations,) if n_samples=1, or + (n_observations, n_samples) with sampled values. + """ + # Put model in eval mode + self.model.model.eval() + + # Create inference dataloader + test_loader = self.model.datamodule.prepare_inference_dataloader(X) + + samples_list = [] + with torch.no_grad(): + for batch in test_loader: + # Move batch tensors to device + for k, v in batch.items(): + if isinstance(v, torch.Tensor): + batch[k] = v.to(self.model.model.device) + + # Sample from the mixture distribution + samples = self.model.model.sample(batch, n_samples=n_samples) + + if n_samples == 1: + samples_list.append(samples.squeeze(-1).cpu().numpy()) + else: + samples_list.append(samples.cpu().numpy()) + + return np.concatenate(samples_list) + + def save(self, path: str) -> None: + """Save the model to disk.""" + self.model.save_model(path) + + @classmethod + def load(cls, path: str, seed: int, logger) -> "_MDNModel": + """Load a model from disk.""" + instance = cls(seed=seed, logger=logger) + instance.model = TabularModel.load_model(path) + return instance + + +class _NeuralClassifierModel: + """Internal wrapper for PyTorch Tabular classifier for categorical/boolean.""" + + def __init__( + self, + seed: int, + logger, + layers: str = "128-64", + activation: str = "ReLU", + dropout: float = 0.1, + use_batch_norm: bool = False, + learning_rate: float = 1e-3, + max_epochs: int = 100, + early_stopping_patience: int = 10, + batch_size: int = 64, + ): + self.seed = seed + self.logger = logger + self.layers = layers + self.activation = activation + self.dropout = dropout + self.use_batch_norm = use_batch_norm + self.learning_rate = learning_rate + self.max_epochs = max_epochs + self.early_stopping_patience = early_stopping_patience + self.batch_size = batch_size + + self.model = None + self.output_column = None + self.var_type = None + self.categories = None + self.label_map = None + + def fit( + self, + X: pd.DataFrame, + y: pd.Series, + var_type: str, + categories: Optional[List] = None, + ) -> None: + """Fit the neural classifier. + + Args: + X: Feature DataFrame (predictors are already dummy-encoded). + y: Target Series (original categorical/boolean column). + var_type: Type of variable ("boolean" or "categorical"). + categories: List of categories for categorical variables. + """ + self.output_column = y.name + self.var_type = var_type + + if var_type == "boolean": + y_encoded = y.astype(int) + self.categories = [False, True] + else: + self.categories = categories if categories else y.unique().tolist() + self.label_map = {cat: i for i, cat in enumerate(self.categories)} + y_encoded = y.map(self.label_map) + + if y_encoded.isna().any(): + self.logger.warning( + f"Found {y_encoded.isna().sum()} unmapped values in " + f"{self.output_column}" + ) + y_encoded = y_encoded.fillna(0) + + # Combine X and encoded y for PyTorch Tabular + train_data = X.copy() + train_data[y.name] = y_encoded.astype(int) + + # All predictors are continuous (categorical predictors were + # dummy-encoded during preprocessing in the base Imputer class) + continuous_cols = X.columns.tolist() + + # Configure data + data_config = DataConfig( + target=[y.name], + continuous_cols=continuous_cols, + categorical_cols=[], + ) + + # Configure trainer + # Note: checkpoints and load_best disabled to avoid PyTorch 2.6 + # weights_only=True issue with OmegaConf objects + trainer_config = TrainerConfig( + max_epochs=self.max_epochs, + batch_size=self.batch_size, + early_stopping="valid_loss", + early_stopping_patience=self.early_stopping_patience, + checkpoints=None, + load_best=False, + accelerator="auto", + devices=1, + trainer_kwargs={ + "enable_progress_bar": False, + "enable_model_summary": False, + }, + ) + + # Configure optimizer + optimizer_config = OptimizerConfig( + optimizer="Adam", + lr_scheduler=None, + ) + + # Configure classifier model with LinearHead + model_config = CategoryEmbeddingModelConfig( + task="classification", + layers=self.layers, + activation=self.activation, + dropout=self.dropout, + use_batch_norm=self.use_batch_norm, + learning_rate=self.learning_rate, + seed=self.seed, + ) + + # Create and train model + self.model = TabularModel( + data_config=data_config, + model_config=model_config, + optimizer_config=optimizer_config, + trainer_config=trainer_config, + ) + + self.model.fit(train=train_data) + + def predict( + self, + X: pd.DataFrame, + return_probs: bool = False, + ) -> Union[pd.Series, Dict[str, Any]]: + """Predict classes by stochastically sampling from probability distribution. + + Predictions are made by sampling from the predicted probability + distribution, not by returning the argmax class. + + Args: + X: Input features. + return_probs: If True, return probability distributions. + + Returns: + Predicted values as Series, or dict with probabilities if + return_probs=True. + """ + # Get predictions with probabilities + preds_df = self.model.predict(X, ret_logits=False) + + # Extract probability columns (named like target_0_probability, etc.) + prob_cols = sorted( + [c for c in preds_df.columns if c.endswith("_probability")] + ) + probs = preds_df[prob_cols].values + + if return_probs: + if self.var_type == "boolean": + original_classes = [False, True] + else: + original_classes = self.categories + + return { + "probabilities": probs, + "classes": np.array(original_classes), + } + + # Stochastically sample from probability distribution + rng = np.random.default_rng(self.seed) + sampled_indices = np.array( + [rng.choice(len(self.categories), p=p) for p in probs] + ) + + if self.var_type == "boolean": + predictions = pd.Series( + sampled_indices.astype(bool), index=X.index + ) + else: + predictions = pd.Series( + [self.categories[i] for i in sampled_indices], index=X.index + ) + + predictions.name = self.output_column + return predictions + + def save(self, path: str) -> None: + """Save the model to disk.""" + self.model.save_model(path) + # Also save metadata + import json + + metadata = { + "output_column": self.output_column, + "var_type": self.var_type, + "categories": [ + str(c) if not isinstance(c, (bool, int, float)) else c + for c in self.categories + ], + "label_map": self.label_map, + } + with open(os.path.join(path, "classifier_metadata.json"), "w") as f: + json.dump(metadata, f) + + @classmethod + def load(cls, path: str, seed: int, logger) -> "_NeuralClassifierModel": + """Load a model from disk.""" + import json + + instance = cls(seed=seed, logger=logger) + instance.model = TabularModel.load_model(path) + + # Load metadata + with open(os.path.join(path, "classifier_metadata.json"), "r") as f: + metadata = json.load(f) + + instance.output_column = metadata["output_column"] + instance.var_type = metadata["var_type"] + instance.categories = metadata["categories"] + instance.label_map = metadata["label_map"] + + # Convert boolean strings back + if instance.var_type == "boolean": + instance.categories = [False, True] + + return instance + + +class MDNResults(ImputerResults): + """Fitted MDN model container ready for imputation.""" + + def __init__( + self, + models: Dict[str, Any], + predictors: List[str], + imputed_variables: List[str], + seed: int, + imputed_vars_dummy_info: Optional[Dict[str, str]] = None, + original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, + dummy_processor: Optional[Any] = None, + log_level: Optional[str] = "WARNING", + ) -> None: + """Initialize MDN results. + + Args: + models: Dictionary of fitted models for each variable. + predictors: List of predictor variable names. + imputed_variables: List of imputed variable names. + seed: Random seed for reproducibility. + imputed_vars_dummy_info: Optional dummy variable info. + original_predictors: Original predictor names before encoding. + categorical_targets: Dictionary of categorical target info. + boolean_targets: Dictionary of boolean target info. + constant_targets: Dictionary of constant target info. + dummy_processor: Processor for handling dummy encoding. + log_level: Logging level. + """ + super().__init__( + predictors, + imputed_variables, + seed, + imputed_vars_dummy_info, + original_predictors, + log_level, + ) + self.models = models + self.categorical_targets = categorical_targets or {} + self.boolean_targets = boolean_targets or {} + self.constant_targets = constant_targets or {} + self.dummy_processor = dummy_processor + + @validate_call(config=VALIDATE_CONFIG) + def _predict( + self, + X_test: pd.DataFrame, + quantiles: Optional[List[float]] = None, + return_probs: bool = False, + ) -> Dict[float, pd.DataFrame]: + """Predict imputed values using stochastic sampling. + + For MDN models, samples are drawn from the learned mixture distribution. + For classifier models, samples are drawn from the predicted probability + distribution. + + Args: + X_test: DataFrame containing the test data. + quantiles: List of quantiles (used to determine number of + independent samples to draw). + return_probs: If True, return probability distributions for + categorical variables. + + Returns: + Dictionary mapping quantiles to imputed DataFrames. + If return_probs=True, includes 'probabilities' key. + """ + try: + imputations: Dict[float, pd.DataFrame] = {} + prob_results = {} if return_probs else None + + # Determine how many independent samples to draw + if quantiles: + quantiles_to_use = quantiles + else: + quantiles_to_use = [0.5] # Default single sample + + for q in quantiles_to_use: + imputed_df = pd.DataFrame(index=X_test.index) + + for variable in self.imputed_variables: + model = self.models[variable] + + if isinstance(model, _ConstantValueModel): + imputed_df[variable] = model.predict(X_test) + + elif isinstance(model, _NeuralClassifierModel): + # Stochastic sampling from probability distribution + if return_probs and prob_results is not None: + prob_info = model.predict( + X_test[self.predictors], + return_probs=True, + ) + prob_results[variable] = prob_info + + imputed_df[variable] = model.predict( + X_test[self.predictors], + return_probs=False, + ) + + elif isinstance(model, _MDNModel): + # Stochastic sampling from MDN mixture distribution + samples = model.predict( + X_test[self.predictors], n_samples=1 + ) + imputed_df[variable] = samples.flatten() + + else: + raise ValueError( + f"Unknown model type for variable {variable}" + ) + + imputations[q] = imputed_df + + # Add probabilities if requested + if return_probs and prob_results: + imputations["probabilities"] = prob_results + + # Return format based on whether quantiles were specified + if quantiles is not None: + return imputations + else: + return imputations[0.5] + + except Exception as e: + self.logger.error(f"Error during MDN prediction: {str(e)}") + raise RuntimeError( + f"Failed to predict with MDN model: {str(e)}" + ) from e + + +class MDN(Imputer): + """ + Mixture Density Network imputer using PyTorch Tabular. + + This imputer uses a neural network with a Mixture Density Network head + for numeric targets and a neural classifier with LinearHead for + categorical/boolean targets. Both share the same backbone architecture. + + Predictions are made by stochastically sampling from the learned + distributions rather than returning point estimates. + + Models are automatically cached based on a hash of the training data, + so identical datasets will reuse previously trained models. + + Attributes: + layers: Network architecture as hyphen-separated string (e.g., "128-64"). + activation: Activation function name (e.g., "ReLU", "LeakyReLU"). + dropout: Dropout probability. + use_batch_norm: Whether to use batch normalization. + num_gaussian: Number of Gaussian components in MDN mixture. + softmax_temperature: Temperature for mixture weight softmax. + n_samples: Number of samples for MDN prediction. + learning_rate: Learning rate for training. + max_epochs: Maximum training epochs. + early_stopping_patience: Patience for early stopping. + batch_size: Training batch size. + model_dir: Directory for saving/loading models. + force_retrain: If True, always retrain instead of loading cached models. + """ + + def __init__( + self, + # Backbone config (shared) + layers: str = "128-64-32", + activation: str = "ReLU", + dropout: float = 0.0, + use_batch_norm: bool = False, + # MDN Head config + num_gaussian: int = 5, + softmax_temperature: float = 1.0, + n_samples: int = 100, + # Training config + learning_rate: float = 1e-3, + max_epochs: int = 100, + early_stopping_patience: int = 10, + batch_size: int = 256, + # Caching config + model_dir: str = "./microimpute_models", + force_retrain: bool = False, + # Standard imputer params + seed: Optional[int] = RANDOM_STATE, + log_level: Optional[str] = "WARNING", + ) -> None: + """Initialize the MDN imputer. + + Args: + layers: Network architecture (e.g., "128-64-32" for three layers). + activation: Activation function (ReLU, LeakyReLU, etc.). + dropout: Dropout probability. + use_batch_norm: Whether to use batch normalization. + num_gaussian: Number of Gaussian components in mixture. + softmax_temperature: Temperature for mixture weights. + n_samples: Number of samples for prediction. + learning_rate: Learning rate for Adam optimizer. + max_epochs: Maximum training epochs. + early_stopping_patience: Early stopping patience. + batch_size: Training batch size. + model_dir: Directory for saving/loading models. + force_retrain: If True, skip cache and always retrain. + seed: Random seed for reproducibility. + log_level: Logging level. + """ + if not PYTORCH_TABULAR_AVAILABLE: + raise ImportError( + "pytorch-tabular is required for MDN imputer. " + "Install it with: pip install pytorch_tabular" + ) + + super().__init__(seed=seed, log_level=log_level) + + self.layers = layers + self.activation = activation + self.dropout = dropout + self.use_batch_norm = use_batch_norm + self.num_gaussian = num_gaussian + self.softmax_temperature = softmax_temperature + self.n_samples = n_samples + self.learning_rate = learning_rate + self.max_epochs = max_epochs + self.early_stopping_patience = early_stopping_patience + self.batch_size = batch_size + self.model_dir = model_dir + self.force_retrain = force_retrain + self.log_level = log_level + + self.models = {} + + self.logger.debug("Initializing MDN imputer") + + def _get_cache_path( + self, predictors: List[str], target: str, data_hash: str + ) -> str: + """Get the cache path for a model. + + Args: + predictors: List of predictor column names. + target: Target variable name. + data_hash: Hash of the training data. + + Returns: + Path to the model cache directory. + """ + cache_key = _generate_cache_key(predictors, target, data_hash) + return os.path.join(self.model_dir, cache_key) + + def _model_exists(self, cache_path: Optional[str]) -> bool: + """Check if a cached model exists. + + Args: + cache_path: Path to check. + + Returns: + True if model exists at path. + """ + if cache_path is None: + return False + return os.path.exists(cache_path) and os.path.isdir(cache_path) + + @validate_call(config=VALIDATE_CONFIG) + def _fit( + self, + X_train: pd.DataFrame, + predictors: List[str], + imputed_variables: List[str], + original_predictors: Optional[List[str]] = None, + categorical_targets: Optional[Dict[str, Dict]] = None, + boolean_targets: Optional[Dict[str, Dict]] = None, + numeric_targets: Optional[List[str]] = None, + constant_targets: Optional[Dict[str, Dict]] = None, + tune_hyperparameters: bool = False, + **kwargs: Any, + ) -> Union[MDNResults, Tuple[MDNResults, Dict[str, Any]]]: + """Fit the MDN model to the training data. + + Args: + X_train: DataFrame containing the training data. + predictors: List of column names to use as predictors. + imputed_variables: List of column names to impute. + original_predictors: Original predictor names before encoding. + categorical_targets: Dict of categorical target info. + boolean_targets: Dict of boolean target info. + numeric_targets: List of numeric target names. + constant_targets: Dict of constant target info. + tune_hyperparameters: If True, tune hyperparameters before fitting. + **kwargs: Additional parameters. + + Returns: + MDNResults instance with fitted models. + If tune_hyperparameters=True, returns (MDNResults, best_params). + """ + try: + best_params = None + + # Optionally tune hyperparameters before fitting + if tune_hyperparameters: + self.logger.info("Starting hyperparameter tuning...") + best_params = self._tune_hyperparameters( + X_train, + predictors, + imputed_variables, + categorical_targets, + boolean_targets, + ) + + # Apply tuned parameters + if isinstance(best_params, dict): + if "mdn" in best_params: + # Mixed types - apply MDN params + mdn_params = best_params["mdn"] + if "num_gaussian" in mdn_params: + self.num_gaussian = mdn_params["num_gaussian"] + if "learning_rate" in mdn_params: + self.learning_rate = mdn_params["learning_rate"] + # Classifier params stored separately + elif "classifier" in best_params: + # Only categorical - apply to learning rate + classifier_params = best_params["classifier"] + if "learning_rate" in classifier_params: + self.learning_rate = classifier_params[ + "learning_rate" + ] + else: + # Only numeric - flat dict + if "num_gaussian" in best_params: + self.num_gaussian = best_params["num_gaussian"] + if "learning_rate" in best_params: + self.learning_rate = best_params["learning_rate"] + + self.logger.info( + f"Applied tuned hyperparameters: {best_params}" + ) + + self.logger.info( + f"Fitting MDN model with {len(predictors)} predictors" + ) + + self.models = {} + + # Ensure model directory exists + Path(self.model_dir).mkdir(parents=True, exist_ok=True) + + for variable in imputed_variables: + # Handle constant targets (no caching needed) + if variable in (constant_targets or {}): + constant_val = constant_targets[variable]["value"] + model = _ConstantValueModel(constant_val, variable) + self.models[variable] = model + self.logger.info( + f"Using constant value {constant_val} for {variable}" + ) + continue + + # Generate data hash for caching + Y = X_train[variable] + data_hash = _generate_data_hash(X_train[predictors], Y) + cache_path = self._get_cache_path( + predictors, variable, data_hash + ) + + # Check cache + if not self.force_retrain and self._model_exists(cache_path): + self.logger.info( + f"Loading cached model for '{variable}' from {cache_path}" + ) + try: + if variable in ( + categorical_targets or {} + ) or variable in (boolean_targets or {}): + model = _NeuralClassifierModel.load( + cache_path, self.seed, self.logger + ) + else: + model = _MDNModel.load( + cache_path, self.seed, self.logger + ) + self.models[variable] = model + continue + except Exception as e: + self.logger.warning( + f"Failed to load cached model: {e}. Retraining..." + ) + + # Choose model based on variable type + if variable in (categorical_targets or {}): + self.logger.warning( + f"Using neural network for categorical variable " + f"'{variable}'. This may be computationally expensive " + f"for simple classification tasks." + ) + + model = _NeuralClassifierModel( + seed=self.seed, + logger=self.logger, + layers=self.layers, + activation=self.activation, + dropout=self.dropout, + use_batch_norm=self.use_batch_norm, + learning_rate=self.learning_rate, + max_epochs=self.max_epochs, + early_stopping_patience=self.early_stopping_patience, + batch_size=self.batch_size, + ) + model.fit( + X_train[predictors], + Y, + var_type=categorical_targets[variable]["type"], + categories=categorical_targets[variable].get( + "categories" + ), + ) + self.logger.info( + f"Neural classifier fitted for categorical variable " + f"{variable}" + ) + + elif variable in (boolean_targets or {}): + self.logger.warning( + f"Using neural network for boolean variable " + f"'{variable}'. This may be computationally expensive " + f"for simple classification tasks." + ) + + model = _NeuralClassifierModel( + seed=self.seed, + logger=self.logger, + layers=self.layers, + activation=self.activation, + dropout=self.dropout, + use_batch_norm=self.use_batch_norm, + learning_rate=self.learning_rate, + max_epochs=self.max_epochs, + early_stopping_patience=self.early_stopping_patience, + batch_size=self.batch_size, + ) + model.fit( + X_train[predictors], + Y, + var_type="boolean", + ) + self.logger.info( + f"Neural classifier fitted for boolean variable " + f"{variable}" + ) + + else: + # Numeric target - use MDN + self.logger.info( + f"Training MDN for numeric variable {variable}" + ) + + model = _MDNModel( + seed=self.seed, + logger=self.logger, + layers=self.layers, + activation=self.activation, + dropout=self.dropout, + use_batch_norm=self.use_batch_norm, + num_gaussian=self.num_gaussian, + softmax_temperature=self.softmax_temperature, + n_samples=self.n_samples, + learning_rate=self.learning_rate, + max_epochs=self.max_epochs, + early_stopping_patience=self.early_stopping_patience, + batch_size=self.batch_size, + ) + model.fit( + X_train[predictors], + Y, + ) + self.logger.info( + f"MDN fitted for numeric variable {variable}" + ) + + self.models[variable] = model + + # Save to cache + try: + Path(cache_path).mkdir(parents=True, exist_ok=True) + model.save(cache_path) + self.logger.info(f"Saved model to {cache_path}") + except Exception as e: + self.logger.warning(f"Failed to save model: {e}") + + results = MDNResults( + models=self.models, + predictors=predictors, + imputed_variables=imputed_variables, + imputed_vars_dummy_info=self.imputed_vars_dummy_info, + original_predictors=self.original_predictors, + categorical_targets=categorical_targets, + boolean_targets=boolean_targets, + constant_targets=constant_targets, + dummy_processor=getattr(self, "dummy_processor", None), + seed=self.seed, + log_level=self.log_level, + ) + + # Return tuple if hyperparameter tuning was performed + if tune_hyperparameters and best_params is not None: + return results, best_params + return results + + except Exception as e: + self.logger.error(f"Error fitting MDN model: {str(e)}") + raise RuntimeError(f"Failed to fit MDN model: {str(e)}") from e + + def _tune_mdn_hyperparameters( + self, + data: pd.DataFrame, + predictors: List[str], + numeric_vars: List[str], + n_cv_folds: int = 3, + n_trials: int = 10, + ) -> Dict[str, Any]: + """Tune MDN hyperparameters using Optuna with cross-validation. + + Tunes num_gaussian and learning_rate for numeric targets. + + Args: + data: Full training data. + predictors: List of column names to use as predictors. + numeric_vars: List of numeric variables to impute. + n_cv_folds: Number of CV folds (default: 3). + n_trials: Number of Optuna trials (default: 10). + + Returns: + Dictionary of tuned hyperparameters for MDN. + """ + import optuna + from sklearn.model_selection import KFold + + from microimpute.comparisons.metrics import compute_loss + + # Suppress Optuna's logs during optimization + optuna.logging.set_verbosity(optuna.logging.WARNING) + + # Set up CV folds + kf = KFold(n_splits=n_cv_folds, shuffle=True, random_state=self.seed) + + def objective(trial: optuna.Trial) -> float: + # Only tune num_gaussian and learning_rate + num_gaussian = trial.suggest_int("num_gaussian", 2, 10) + learning_rate = trial.suggest_float( + "learning_rate", 1e-4, 1e-2, log=True + ) + + # Track errors across CV folds + fold_errors = [] + + # Perform CV + for train_idx, val_idx in kf.split(data): + X_train_fold = data.iloc[train_idx] + X_val_fold = data.iloc[val_idx] + + # Track errors for numeric variables in this fold + var_errors = [] + + for var in numeric_vars: + y_train = X_train_fold[var] + y_val = X_val_fold[var] + + # Create and fit MDN model with trial parameters + model = _MDNModel( + seed=self.seed, + logger=self.logger, + layers=self.layers, + activation=self.activation, + dropout=self.dropout, + use_batch_norm=self.use_batch_norm, + num_gaussian=num_gaussian, + softmax_temperature=self.softmax_temperature, + n_samples=self.n_samples, + learning_rate=learning_rate, + max_epochs=40, # Reduced for tuning + early_stopping_patience=self.early_stopping_patience, + batch_size=self.batch_size, + ) + model.fit(X_train_fold[predictors], y_train) + + # Predict using stochastic sampling + y_pred = model.predict(X_val_fold[predictors], n_samples=1) + + # Use quantile loss with median (q=0.5) for tuning + _, quantile_loss_value = compute_loss( + y_val.values.flatten(), + y_pred.flatten(), + "quantile_loss", + q=0.5, + ) + + # Normalize by variable's standard deviation + std = np.std(y_val.values.flatten()) + normalized_loss = ( + quantile_loss_value / std + if std > 0 + else quantile_loss_value + ) + + var_errors.append(normalized_loss) + + # Average across variables for this fold + if var_errors: + fold_errors.append(np.mean(var_errors)) + + # Return mean error across all CV folds + return np.mean(fold_errors) if fold_errors else float("inf") + + # Create and run the study + study = optuna.create_study( + direction="minimize", + sampler=optuna.samplers.TPESampler(seed=self.seed), + ) + + study.optimize(objective, n_trials=n_trials) + + best_value = study.best_value + self.logger.info( + f"MDN - Lowest average normalized quantile loss " + f"({n_cv_folds}-fold CV): {best_value}" + ) + + best_params = study.best_params + self.logger.info(f"MDN - Best hyperparameters found: {best_params}") + + return best_params + + def _tune_classifier_hyperparameters( + self, + data: pd.DataFrame, + predictors: List[str], + categorical_vars: List[str], + categorical_targets: Optional[Dict] = None, + boolean_targets: Optional[Dict] = None, + n_cv_folds: int = 3, + n_trials: int = 10, + ) -> Dict[str, Any]: + """Tune neural classifier hyperparameters using Optuna with CV. + + Tunes learning_rate for categorical/boolean targets. + + Args: + data: Full training data. + predictors: List of column names to use as predictors. + categorical_vars: List of categorical/boolean variables to impute. + categorical_targets: Dict of categorical target info. + boolean_targets: Dict of boolean target info. + n_cv_folds: Number of CV folds (default: 3). + n_trials: Number of Optuna trials (default: 10). + + Returns: + Dictionary of tuned hyperparameters for classifier. + """ + import optuna + from sklearn.model_selection import KFold + + from microimpute.comparisons.metrics import ( + compute_loss, + order_probabilities_alphabetically, + ) + + # Suppress Optuna's logs during optimization + optuna.logging.set_verbosity(optuna.logging.WARNING) + + categorical_targets = categorical_targets or {} + boolean_targets = boolean_targets or {} + + # Set up CV folds + kf = KFold(n_splits=n_cv_folds, shuffle=True, random_state=self.seed) + + def objective(trial: optuna.Trial) -> float: + # Only tune learning_rate for classifier + learning_rate = trial.suggest_float( + "learning_rate", 1e-4, 1e-2, log=True + ) + + # Track errors across CV folds + fold_errors = [] + + # Perform CV + for train_idx, val_idx in kf.split(data): + X_train_fold = data.iloc[train_idx] + X_val_fold = data.iloc[val_idx] + + # Track errors for categorical variables in this fold + var_errors = [] + + for var in categorical_vars: + y_train = X_train_fold[var] + y_val = X_val_fold[var] + + # Determine variable type + if var in boolean_targets: + var_type = "boolean" + categories = None + else: + var_type = categorical_targets[var].get( + "type", "categorical" + ) + categories = categorical_targets[var].get("categories") + + # Create and fit classifier with trial parameters + # Use 40 epochs for tuning + model = _NeuralClassifierModel( + seed=self.seed, + logger=self.logger, + layers=self.layers, + activation=self.activation, + dropout=self.dropout, + use_batch_norm=self.use_batch_norm, + learning_rate=learning_rate, + max_epochs=40, # Reduced for tuning + early_stopping_patience=self.early_stopping_patience, + batch_size=self.batch_size, + ) + model.fit( + X_train_fold[predictors], + y_train, + var_type=var_type, + categories=categories, + ) + + # Get probabilities for log loss calculation + prob_info = model.predict( + X_val_fold[predictors], return_probs=True + ) + probs = prob_info["probabilities"] + classes = prob_info["classes"] + + # Order probabilities alphabetically for consistent evaluation + probs, classes = order_probabilities_alphabetically( + probs, classes + ) + + # Compute log loss + _, log_loss_value = compute_loss( + y_val.values, + probs, + "log_loss", + labels=classes, + ) + + var_errors.append(log_loss_value) + + # Average across variables for this fold + if var_errors: + fold_errors.append(np.mean(var_errors)) + + # Return mean error across all CV folds + return np.mean(fold_errors) if fold_errors else float("inf") + + # Create and run the study + study = optuna.create_study( + direction="minimize", + sampler=optuna.samplers.TPESampler(seed=self.seed), + ) + + study.optimize(objective, n_trials=n_trials) + + best_value = study.best_value + self.logger.info( + f"Classifier - Lowest average log loss " + f"({n_cv_folds}-fold CV): {best_value}" + ) + + best_params = study.best_params + self.logger.info( + f"Classifier - Best hyperparameters found: {best_params}" + ) + + return best_params + + def _tune_hyperparameters( + self, + data: pd.DataFrame, + predictors: List[str], + imputed_variables: List[str], + categorical_targets: Optional[Dict] = None, + boolean_targets: Optional[Dict] = None, + n_cv_folds: int = 3, + n_trials: int = 10, + ) -> Dict[str, Any]: + """Coordinate hyperparameter tuning for MDN. + + Automatically detects variable types and tunes appropriate models: + - Numeric variables: MDN with num_gaussian and learning_rate + - Categorical/Boolean variables: Classifier with learning_rate + + Args: + data: DataFrame containing the training data. + predictors: List of column names to use as predictors. + imputed_variables: List of column names to impute. + categorical_targets: Dict of categorical target info. + boolean_targets: Dict of boolean target info. + + Returns: + Dictionary of tuned hyperparameters. Format depends on variable types: + - Only numeric: flat dict with MDN params + - Only categorical: flat dict with classifier params + - Mixed: nested dict {"mdn": {...}, "classifier": {...}} + """ + categorical_targets = categorical_targets or {} + boolean_targets = boolean_targets or {} + + # Separate variables by type + categorical_vars = [ + var + for var in imputed_variables + if var in categorical_targets or var in boolean_targets + ] + numeric_vars = [ + var for var in imputed_variables if var not in categorical_vars + ] + + self.logger.info( + f"MDN hyperparameter tuning with {n_cv_folds}-fold CV and " + f"{n_trials} trials: {len(numeric_vars)} numeric variables, " + f"{len(categorical_vars)} categorical/boolean variables" + ) + + # Tune appropriate models based on variable types + if not categorical_vars: + # Only numeric variables + self.logger.info( + "Tuning MDN hyperparameters (numeric variables only)" + ) + return self._tune_mdn_hyperparameters( + data, predictors, numeric_vars, n_cv_folds, n_trials + ) + elif not numeric_vars: + # Only categorical variables + self.logger.info( + "Tuning classifier hyperparameters " + "(categorical/boolean variables only)" + ) + return self._tune_classifier_hyperparameters( + data, + predictors, + categorical_vars, + categorical_targets, + boolean_targets, + n_cv_folds, + n_trials, + ) + else: + # Mixed: tune both separately + self.logger.info( + "Tuning both MDN and classifier hyperparameters " + "(mixed variable types)" + ) + mdn_params = self._tune_mdn_hyperparameters( + data, predictors, numeric_vars, n_cv_folds, n_trials + ) + classifier_params = self._tune_classifier_hyperparameters( + data, + predictors, + categorical_vars, + categorical_targets, + boolean_targets, + n_cv_folds, + n_trials, + ) + return {"mdn": mdn_params, "classifier": classifier_params} diff --git a/pyproject.toml b/pyproject.toml index 7c2d63c..d3e6094 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,11 @@ matching = [ "rpy2>=3.5.0,<4.0.0", ] +mdn = [ + "pytorch-tabular>=1.1.0", + "torch>=2.0.0", +] + docs = [ "jupyter-book", "furo>=2024.0.0", # Sphinx theme for documentation diff --git a/tests/test_models/test_imputers.py b/tests/test_models/test_imputers.py index 05b2378..e9f87f7 100644 --- a/tests/test_models/test_imputers.py +++ b/tests/test_models/test_imputers.py @@ -78,6 +78,14 @@ def data_with_edge_cases() -> pd.DataFrame: except ImportError: pass +try: + from microimpute.models.mdn import MDN + + ALL_IMPUTER_MODELS.append(MDN) + CATEGORICAL_MODELS.append(MDN) +except ImportError: + pass + # === Basic Interface Tests === diff --git a/tests/test_models/test_mdn.py b/tests/test_models/test_mdn.py new file mode 100644 index 0000000..ea74842 --- /dev/null +++ b/tests/test_models/test_mdn.py @@ -0,0 +1,663 @@ +"""Tests for the MDN (Mixture Density Network) imputation model.""" + +import os +import shutil +import tempfile +from typing import Dict, List + +import numpy as np +import pandas as pd +import pytest + +from microimpute.utils.data import preprocess_data + +# Skip all tests if pytorch-tabular is not available +pytest.importorskip("pytorch_tabular") + +from microimpute.models.mdn import ( + MDN, + _generate_cache_key, + _generate_data_hash, +) + + +# === Fixtures === + + +@pytest.fixture +def simple_numeric_data() -> pd.DataFrame: + """Create simple synthetic numeric data for testing.""" + np.random.seed(42) + n_samples = 200 + + x1 = np.random.randn(n_samples) + x2 = np.random.randn(n_samples) + # Target with known relationship to predictors + y = 2 * x1 + 0.5 * x2 + np.random.randn(n_samples) * 0.5 + + return pd.DataFrame({"x1": x1, "x2": x2, "y": y}) + + +@pytest.fixture +def mixed_type_data() -> pd.DataFrame: + """Create data with numeric, categorical, and boolean targets.""" + np.random.seed(42) + n_samples = 200 + + x1 = np.random.randn(n_samples) + x2 = np.random.randn(n_samples) + + # Numeric target + y_numeric = 2 * x1 + np.random.randn(n_samples) * 0.5 + + # Categorical target + categories = ["A", "B", "C"] + y_categorical = np.random.choice(categories, n_samples) + + # Boolean target + y_boolean = (x1 + x2 > 0).astype(bool) + + return pd.DataFrame( + { + "x1": x1, + "x2": x2, + "y_numeric": y_numeric, + "y_categorical": y_categorical, + "y_boolean": y_boolean, + } + ) + + +@pytest.fixture +def temp_model_dir(): + """Create a temporary directory for model caching tests.""" + temp_dir = tempfile.mkdtemp() + yield temp_dir + # Cleanup after test + shutil.rmtree(temp_dir, ignore_errors=True) + + +# === Basic Functionality Tests === + + +def test_mdn_basic_fit_predict(simple_numeric_data: pd.DataFrame) -> None: + """Test basic MDN model fitting and prediction for numeric data.""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + # Initialize with small network for fast testing + model = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + ) + fitted_model = model.fit(X_train, predictors, imputed_variables) + + # Predict without specifying quantiles (should return single DataFrame) + predictions = fitted_model.predict(X_test) + + assert isinstance(predictions, pd.DataFrame) + assert predictions.shape == (len(X_test), len(imputed_variables)) + assert not predictions.isna().any().any() + + +def test_mdn_multiple_quantiles(simple_numeric_data: pd.DataFrame) -> None: + """Test MDN prediction with multiple quantile requests.""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + model = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + ) + fitted_model = model.fit(X_train, predictors, imputed_variables) + + # Predict at multiple quantiles + quantiles = [0.25, 0.5, 0.75] + predictions = fitted_model.predict(X_test, quantiles=quantiles) + + assert isinstance(predictions, dict) + assert set(predictions.keys()) == set(quantiles) + + for q, pred_df in predictions.items(): + assert pred_df.shape == (len(X_test), len(imputed_variables)) + assert not pred_df.isna().any().any() + + +def test_mdn_stochastic_sampling(simple_numeric_data: pd.DataFrame) -> None: + """Test that MDN produces stochastic (different) samples.""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + model = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + ) + fitted_model = model.fit(X_train, predictors, imputed_variables) + + # Get two sets of predictions + pred1 = fitted_model.predict(X_test) + pred2 = fitted_model.predict(X_test) + + # Due to stochastic sampling, predictions should differ + # (unless seed makes them identical within the model) + # At minimum, verify both are valid + assert pred1.shape == pred2.shape + assert not pred1.isna().any().any() + assert not pred2.isna().any().any() + + +# === Mixed Variable Type Tests === + + +def test_mdn_categorical_target(mixed_type_data: pd.DataFrame) -> None: + """Test MDN with categorical target variable.""" + predictors = ["x1", "x2"] + imputed_variables = ["y_categorical"] + + X_train, X_test = preprocess_data(mixed_type_data) + + model = MDN( + layers="32-16", + max_epochs=5, + batch_size=32, + ) + + # Should log warning about neural classifier + fitted_model = model.fit(X_train, predictors, imputed_variables) + + predictions = fitted_model.predict(X_test) + + assert isinstance(predictions, pd.DataFrame) + assert len(predictions) == len(X_test) + # Predictions should be from original categories + assert set(predictions["y_categorical"].unique()).issubset({"A", "B", "C"}) + + +def test_mdn_boolean_target(mixed_type_data: pd.DataFrame) -> None: + """Test MDN with boolean target variable.""" + predictors = ["x1", "x2"] + imputed_variables = ["y_boolean"] + + X_train, X_test = preprocess_data(mixed_type_data) + + model = MDN( + layers="32-16", + max_epochs=5, + batch_size=32, + ) + + fitted_model = model.fit(X_train, predictors, imputed_variables) + + predictions = fitted_model.predict(X_test) + + assert isinstance(predictions, pd.DataFrame) + assert len(predictions) == len(X_test) + # Predictions should be boolean + assert predictions["y_boolean"].dtype == bool + + +def test_mdn_mixed_targets(mixed_type_data: pd.DataFrame) -> None: + """Test MDN with mixed numeric, categorical, and boolean targets.""" + predictors = ["x1", "x2"] + imputed_variables = ["y_numeric", "y_categorical", "y_boolean"] + + X_train, X_test = preprocess_data(mixed_type_data) + + model = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + ) + + fitted_model = model.fit(X_train, predictors, imputed_variables) + + predictions = fitted_model.predict(X_test) + + assert isinstance(predictions, pd.DataFrame) + assert set(predictions.columns) == set(imputed_variables) + assert len(predictions) == len(X_test) + assert not predictions.isna().any().any() + + +# === Model Caching Tests === + + +def test_mdn_model_caching( + simple_numeric_data: pd.DataFrame, temp_model_dir: str +) -> None: + """Test that MDN models are cached and reloaded correctly.""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + # First fit - should train and save model + model1 = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + model_dir=temp_model_dir, + ) + fitted_model1 = model1.fit(X_train, predictors, imputed_variables) + predictions1 = fitted_model1.predict(X_test) + + # Second fit with same data - should load from cache + model2 = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + model_dir=temp_model_dir, + ) + fitted_model2 = model2.fit(X_train, predictors, imputed_variables) + predictions2 = fitted_model2.predict(X_test) + + # Both should produce valid predictions + assert predictions1.shape == predictions2.shape + assert not predictions1.isna().any().any() + assert not predictions2.isna().any().any() + + # Check that cache directory was created (at least one subdirectory) + assert len(os.listdir(temp_model_dir)) > 0 + + +def test_mdn_force_retrain( + simple_numeric_data: pd.DataFrame, temp_model_dir: str +) -> None: + """Test that force_retrain bypasses cache.""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + # First fit + model1 = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + model_dir=temp_model_dir, + ) + model1.fit(X_train, predictors, imputed_variables) + + # Second fit with force_retrain - should train again + model2 = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + model_dir=temp_model_dir, + force_retrain=True, + ) + fitted_model2 = model2.fit(X_train, predictors, imputed_variables) + predictions = fitted_model2.predict(X_test) + + assert not predictions.isna().any().any() + + +def test_mdn_different_data_different_cache( + simple_numeric_data: pd.DataFrame, temp_model_dir: str +) -> None: + """Test that different datasets create different cache entries.""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + # Fit with original data + model1 = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + model_dir=temp_model_dir, + ) + model1.fit(X_train, predictors, imputed_variables) + + # Count cache entries + cache_count_1 = len(os.listdir(temp_model_dir)) + + # Create modified data (different values) + modified_data = simple_numeric_data.copy() + modified_data["y"] = modified_data["y"] * 2 + X_train_mod, _ = preprocess_data(modified_data) + + # Fit with modified data - should create new cache entry + model2 = MDN( + layers="32-16", + num_gaussian=3, + max_epochs=5, + batch_size=32, + model_dir=temp_model_dir, + ) + model2.fit(X_train_mod, predictors, imputed_variables) + + # Should have more cache entries now + cache_count_2 = len(os.listdir(temp_model_dir)) + assert cache_count_2 > cache_count_1 + + +# === Edge Cases === + + +def test_mdn_constant_target(simple_numeric_data: pd.DataFrame) -> None: + """Test MDN with constant target variable.""" + data = simple_numeric_data.copy() + data["y_constant"] = 5.0 # Constant value + + predictors = ["x1", "x2"] + imputed_variables = ["y_constant"] + + X_train, X_test = preprocess_data(data) + + model = MDN( + layers="32-16", + max_epochs=5, + batch_size=32, + ) + fitted_model = model.fit(X_train, predictors, imputed_variables) + + predictions = fitted_model.predict(X_test) + + # All predictions should be the constant value + assert np.allclose(predictions["y_constant"].values, 5.0) + + +def test_mdn_single_observation() -> None: + """Test MDN prediction with single observation.""" + np.random.seed(42) + + # Create small training data + train_data = pd.DataFrame( + { + "x1": np.random.randn(50), + "x2": np.random.randn(50), + "y": np.random.randn(50), + } + ) + + # Single test observation + test_data = pd.DataFrame({"x1": [0.5], "x2": [-0.3], "y": [0.0]}) + + model = MDN( + layers="16-8", + num_gaussian=2, + max_epochs=3, + batch_size=16, + ) + fitted_model = model.fit(train_data, ["x1", "x2"], ["y"]) + + predictions = fitted_model.predict(test_data) + + assert len(predictions) == 1 + assert not predictions.isna().any().any() + + +# === Configuration Tests === + + +def test_mdn_custom_configuration() -> None: + """Test MDN with custom configuration parameters.""" + np.random.seed(42) + + data = pd.DataFrame( + { + "x1": np.random.randn(100), + "x2": np.random.randn(100), + "y": np.random.randn(100), + } + ) + + X_train, X_test = preprocess_data(data) + + # Custom configuration + model = MDN( + layers="64-32-16", # Deeper network + activation="LeakyReLU", + dropout=0.2, + use_batch_norm=True, + num_gaussian=10, # More mixture components + softmax_temperature=0.5, + learning_rate=5e-4, + max_epochs=3, + batch_size=16, + ) + + fitted_model = model.fit(X_train, ["x1", "x2"], ["y"]) + predictions = fitted_model.predict(X_test) + + assert not predictions.isna().any().any() + + +def test_generate_cache_key(): + """Test cache key generation.""" + data_hash = "abc123def456" + + key1 = _generate_cache_key(["a", "b", "c"], "target", data_hash) + key2 = _generate_cache_key( + ["c", "b", "a"], "target", data_hash + ) # Different predictor order + key3 = _generate_cache_key(["a", "b", "c"], "target", "different_hash") + + # Same predictors (regardless of order) should produce same key + assert key1 == key2 + + # Different data hash should produce different key + assert key1 != key3 + + +def test_generate_data_hash(): + """Test data hash generation.""" + np.random.seed(42) + + # Create test data + X = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) + y = pd.Series([7, 8, 9], name="target") + + # Same data should produce same hash + hash1 = _generate_data_hash(X, y) + hash2 = _generate_data_hash(X.copy(), y.copy()) + assert hash1 == hash2 + + # Different data should produce different hash + X_different = pd.DataFrame({"a": [1, 2, 4], "b": [4, 5, 6]}) + hash3 = _generate_data_hash(X_different, y) + assert hash1 != hash3 + + +# === Return Probabilities Test === + + +def test_mdn_return_probs(mixed_type_data: pd.DataFrame) -> None: + """Test MDN returning probability distributions for categorical variables.""" + predictors = ["x1", "x2"] + imputed_variables = ["y_categorical"] + + X_train, X_test = preprocess_data(mixed_type_data) + + model = MDN( + layers="32-16", + max_epochs=5, + batch_size=32, + ) + fitted_model = model.fit(X_train, predictors, imputed_variables) + + # Predict with return_probs=True + predictions = fitted_model.predict( + X_test, quantiles=[0.5], return_probs=True + ) + + assert "probabilities" in predictions + assert "y_categorical" in predictions["probabilities"] + + prob_info = predictions["probabilities"]["y_categorical"] + assert "probabilities" in prob_info + assert "classes" in prob_info + + # Probabilities should sum to 1 + probs = prob_info["probabilities"] + assert np.allclose(probs.sum(axis=1), 1.0) + + +# === Hyperparameter Tuning Tests === + + +def test_mdn_hyperparameter_tuning_numeric( + simple_numeric_data: pd.DataFrame, +) -> None: + """Test MDN hyperparameter tuning with numeric targets.""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + model = MDN( + layers="16-8", + max_epochs=5, # Short for testing + batch_size=32, + ) + + # Fit with tuning enabled + result = model.fit( + X_train, predictors, imputed_variables, tune_hyperparameters=True + ) + + # Should return tuple (MDNResults, best_params) + assert isinstance(result, tuple) + assert len(result) == 2 + + fitted_model, best_params = result + + # Check best_params structure for numeric-only case + assert isinstance(best_params, dict) + assert "num_gaussian" in best_params + assert "learning_rate" in best_params + + # Verify tuned params are within expected ranges + assert 2 <= best_params["num_gaussian"] <= 10 + assert 1e-4 <= best_params["learning_rate"] <= 1e-2 + + # Verify model can still predict + predictions = fitted_model.predict(X_test) + assert not predictions.isna().any().any() + + +def test_mdn_hyperparameter_tuning_categorical( + mixed_type_data: pd.DataFrame, +) -> None: + """Test MDN hyperparameter tuning with categorical targets.""" + predictors = ["x1", "x2"] + imputed_variables = ["y_categorical"] + + X_train, X_test = preprocess_data(mixed_type_data) + + model = MDN( + layers="16-8", + max_epochs=5, # Short for testing + batch_size=32, + ) + + # Fit with tuning enabled + result = model.fit( + X_train, predictors, imputed_variables, tune_hyperparameters=True + ) + + # Should return tuple (MDNResults, best_params) + assert isinstance(result, tuple) + assert len(result) == 2 + + fitted_model, best_params = result + + # Check best_params structure for categorical-only case + assert isinstance(best_params, dict) + assert "learning_rate" in best_params + + # Verify tuned params are within expected ranges + assert 1e-4 <= best_params["learning_rate"] <= 1e-2 + + # Verify model can still predict + predictions = fitted_model.predict(X_test) + assert not predictions.isna().any().any() + + +def test_mdn_hyperparameter_tuning_mixed( + mixed_type_data: pd.DataFrame, +) -> None: + """Test MDN hyperparameter tuning with mixed targets.""" + predictors = ["x1", "x2"] + imputed_variables = ["y_numeric", "y_categorical"] + + X_train, X_test = preprocess_data(mixed_type_data) + + model = MDN( + layers="16-8", + max_epochs=5, # Short for testing + batch_size=32, + ) + + # Fit with tuning enabled + result = model.fit( + X_train, predictors, imputed_variables, tune_hyperparameters=True + ) + + # Should return tuple (MDNResults, best_params) + assert isinstance(result, tuple) + assert len(result) == 2 + + fitted_model, best_params = result + + # Check best_params structure for mixed case + assert isinstance(best_params, dict) + assert "mdn" in best_params + assert "classifier" in best_params + + # Verify MDN params + assert "num_gaussian" in best_params["mdn"] + assert "learning_rate" in best_params["mdn"] + + # Verify classifier params + assert "learning_rate" in best_params["classifier"] + + # Verify model can still predict + predictions = fitted_model.predict(X_test) + assert not predictions.isna().any().any() + + +def test_mdn_without_tuning_returns_single_result( + simple_numeric_data: pd.DataFrame, +) -> None: + """Test that MDN without tuning returns just MDNResults (not tuple).""" + predictors = ["x1", "x2"] + imputed_variables = ["y"] + + X_train, X_test = preprocess_data(simple_numeric_data) + + model = MDN( + layers="16-8", + max_epochs=3, + batch_size=32, + ) + + # Fit without tuning + result = model.fit(X_train, predictors, imputed_variables) + + # Should return MDNResults directly, not a tuple + assert not isinstance(result, tuple) + + # Verify model can predict + predictions = result.predict(X_test) + assert not predictions.isna().any().any()