From b6b3b0d5708a97abf29fa649ffc2ba4581864826 Mon Sep 17 00:00:00 2001 From: Kate Lawrence Date: Fri, 5 Dec 2025 08:09:19 -0800 Subject: [PATCH 1/4] Add comprehensive file existence checks with clear error messages - Check for missing phenotype, covariate, genotype, sumstat, column mapping, and LD metadata files - Error (not warn) when files are missing to catch path issues early - Fix genotype file check to look for .bed/.bim/.fam or .pgen/.pvar/.psam files instead of base filename - Improve error messages to show which files are missing with condition names - Filter phenotypes without data for specific region before loading --- R/colocboost_pipeline.R | 188 +++++++++++++++++++++++++++++++++++++++- R/file_utils.R | 30 +++++-- 2 files changed, 209 insertions(+), 9 deletions(-) diff --git a/R/colocboost_pipeline.R b/R/colocboost_pipeline.R index 2b22d712..709304b3 100644 --- a/R/colocboost_pipeline.R +++ b/R/colocboost_pipeline.R @@ -111,16 +111,143 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for for (i_data in 1:n_dataset) { # extract genotype file name genotype <- genotype_list[i_data] + + # Check if PLINK files exist (.bed/.bim/.fam for PLINK 1 or .pgen/.pvar/.psam for PLINK 2) + bed_file <- paste0(genotype, ".bed") + bim_file <- paste0(genotype, ".bim") + fam_file <- paste0(genotype, ".fam") + pgen_file <- paste0(genotype, ".pgen") + pvar_file <- paste0(genotype, ".pvar") + psam_file <- paste0(genotype, ".psam") + + plink1_exists <- file.exists(bed_file) && file.exists(bim_file) && file.exists(fam_file) + plink2_exists <- file.exists(pgen_file) && file.exists(pvar_file) && file.exists(psam_file) + + if (!plink1_exists && !plink2_exists) { + missing_files <- c() + if (!file.exists(bed_file)) missing_files <- c(missing_files, bed_file) + if (!file.exists(bim_file)) missing_files <- c(missing_files, bim_file) + if (!file.exists(fam_file)) missing_files <- c(missing_files, fam_file) + if (!file.exists(pgen_file)) missing_files <- c(missing_files, pgen_file) + if (!file.exists(pvar_file)) missing_files <- c(missing_files, pvar_file) + if (!file.exists(psam_file)) missing_files <- c(missing_files, psam_file) + + stop("Genotype files do not exist for dataset ", i_data, ".\n", + " Base path: ", genotype, "\n", + " Missing files: ", paste(missing_files, collapse = ", "), + call. = FALSE) + } + # extract phenotype and covariate file names pos <- which(match_geno_pheno == i_data) phenotype <- phenotype_list[pos] covariate <- covariate_list[pos] conditions <- conditions_list_individual[pos] + + # Check for missing files first and provide clear errors + missing_pheno <- !vapply(phenotype, file.exists, logical(1)) + missing_covar <- !vapply(covariate, file.exists, logical(1)) + + if (any(missing_pheno)) { + missing_pheno_files <- phenotype[missing_pheno] + missing_pheno_conditions <- conditions[missing_pheno] + stop("Phenotype file(s) do not exist:\n", + paste(paste0(" - ", missing_pheno_conditions, ": ", missing_pheno_files), collapse = "\n"), + call. = FALSE) + } + + if (any(missing_covar)) { + missing_covar_files <- covariate[missing_covar] + missing_covar_conditions <- conditions[missing_covar] + stop("Covariate file(s) do not exist:\n", + paste(paste0(" - ", missing_covar_conditions, ": ", missing_covar_files), collapse = "\n"), + call. = FALSE) + } + + # All files should exist at this point (errors would have been thrown) + + # Helper function to check if phenotype file has data for the specific region + # Note: The file may have data elsewhere, but we only care about this region + # This is a pre-check; the actual loader will do the definitive check + check_phenotype_has_data <- function(pheno_file, region_str) { + # File existence already checked above, so we can assume it exists here + # Parse region to get chromosome and coordinates + region_parts <- strsplit(region_str, ":", fixed = TRUE)[[1]] + if (length(region_parts) != 2) { + return(TRUE) # If region format is wrong, let the actual load function handle it + } + chr <- region_parts[1] + coord_parts <- strsplit(region_parts[2], "-", fixed = TRUE)[[1]] + if (length(coord_parts) != 2) { + return(TRUE) # If region format is wrong, let the actual load function handle it + } + start <- coord_parts[1] + end <- coord_parts[2] + + # Try both formats: with and without 'chr' prefix + # Different files may be indexed with different formats + region_formats <- c( + region_str, # Try original format first (e.g., "chr1:0-1000000") + paste0(sub("^chr", "", chr), ":", start, "-", end) # Try without chr (e.g., "1:0-1000000") + ) + + # Use tabix to check if file has data for this specific region + # Use the same approach as tabix_region: try fread with tabix command + for (region_tabix in region_formats) { + # Try using fread with tabix command (same as tabix_region does) + cmd_output <- tryCatch( + { + data.table::fread(cmd = paste0("tabix -h ", pheno_file, " ", region_tabix), + sep = "auto", header = "auto") + }, + error = function(e) NULL + ) + + # If we got output with data, return TRUE + if (!is.null(cmd_output) && nrow(cmd_output) > 0) { + return(TRUE) + } + + # If fread failed, it might be because file isn't indexed or has other issues + # In that case, be lenient and let the actual loader handle it + # (The loader will give a proper error message if needed) + } + + # If we tried all formats and got no data, return FALSE + # But only if we successfully queried (didn't get errors) + # For now, if we reach here after trying both formats, assume no data + return(FALSE) + } + + # Filter to only phenotypes with data for this specific region + has_data <- vapply(phenotype, function(p) check_phenotype_has_data(p, region), logical(1)) + valid_indices <- which(has_data) + + if (length(valid_indices) == 0) { + warning("No phenotype files have data for region ", region, + " for genotype dataset ", i_data, ". Skipping this dataset.", call. = FALSE) + next + } + + if (length(valid_indices) < length(phenotype)) { + skipped_conditions <- conditions[!has_data] + warning("Filtered from ", length(phenotype), " to ", length(valid_indices), + " tissues with data in region ", region, + ". Skipped tissues (no data in this region): ", paste(skipped_conditions, collapse = ", "), + call. = FALSE) + } + + # Filter lists to only valid phenotypes + phenotype_filtered <- phenotype[valid_indices] + covariate_filtered <- covariate[valid_indices] + conditions_filtered <- conditions[valid_indices] + + # Load all valid phenotypes together dat <- load_regional_univariate_data( - genotype = genotype, phenotype = phenotype, - covariate = covariate, region = region, + genotype = genotype, phenotype = phenotype_filtered, + covariate = covariate_filtered, region = region, association_window = association_window, - conditions = conditions, xvar_cutoff = xvar_cutoff, + conditions = conditions_filtered, xvar_cutoff = xvar_cutoff, maf_cutoff = maf_cutoff, mac_cutoff = mac_cutoff, imiss_cutoff = imiss_cutoff, keep_indel = keep_indel, keep_samples = keep_samples, keep_variants = keep_variants, @@ -129,6 +256,7 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for region_name_col = region_name_col, scale_residuals = scale_residuals ) + if (is.null(individual_data)) { individual_data <- dat } else { @@ -150,11 +278,55 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for if (length(match_LD_sumstat) != length(LD_meta_file_path_list)) { stop("Please make sure 'match_LD_sumstat' matched 'LD_meta_file_path_list' if you load data from multiple sumstats.") } + + # Check for missing sumstat files + missing_sumstat <- !vapply(sumstat_path_list, file.exists, logical(1)) + if (any(missing_sumstat)) { + missing_sumstat_files <- sumstat_path_list[missing_sumstat] + missing_sumstat_conditions <- if (!is.null(conditions_list_sumstat)) { + conditions_list_sumstat[missing_sumstat] + } else { + paste0("sumstat_", which(missing_sumstat)) + } + stop("Summary statistics file(s) do not exist:\n", + paste(paste0(" - ", missing_sumstat_conditions, ": ", missing_sumstat_files), collapse = "\n"), + call. = FALSE) + } + + # Check for missing column mapping files + missing_column <- !vapply(column_file_path_list, file.exists, logical(1)) + if (any(missing_column)) { + missing_column_files <- column_file_path_list[missing_column] + missing_column_conditions <- if (!is.null(conditions_list_sumstat)) { + conditions_list_sumstat[missing_column] + } else { + paste0("sumstat_", which(missing_column)) + } + stop("Column mapping file(s) do not exist:\n", + paste(paste0(" - ", missing_column_conditions, ": ", missing_column_files), collapse = "\n"), + call. = FALSE) + } + + # Check for missing LD metadata files + missing_ld_meta <- !vapply(LD_meta_file_path_list, file.exists, logical(1)) + if (any(missing_ld_meta)) { + missing_ld_meta_files <- LD_meta_file_path_list[missing_ld_meta] + stop("LD metadata file(s) do not exist:\n", + paste(paste0(" - LD_meta_", which(missing_ld_meta), ": ", missing_ld_meta_files), collapse = "\n"), + call. = FALSE) + } + # - load sumstat data from multiple datasets n_LD <- length(match_LD_sumstat) for (i_ld in 1:n_LD) { # extract LD meta file path name LD_meta_file_path <- LD_meta_file_path_list[i_ld] + + # Error if LD metadata file doesn't exist (already checked above, but double-check) + if (!file.exists(LD_meta_file_path)) { + stop("LD metadata file does not exist for dataset ", i_ld, ": ", LD_meta_file_path, call. = FALSE) + } + LD_info <- load_LD_matrix(LD_meta_file_path, region = association_window, extract_coordinates = extract_coordinates @@ -165,6 +337,14 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for sumstats <- lapply(pos, function(ii) { sumstat_path <- sumstat_path_list[ii] column_file_path <- column_file_path_list[ii] + + # Error if files don't exist (already checked above, but double-check) + if (!file.exists(sumstat_path)) { + stop("Summary statistics file does not exist for ", conditions_list_sumstat[ii], ": ", sumstat_path, call. = FALSE) + } + if (!file.exists(column_file_path)) { + stop("Column mapping file does not exist for ", conditions_list_sumstat[ii], ": ", column_file_path, call. = FALSE) + } # FIXME later: when consider multiple LD reference tmp <- load_rss_data( sumstat_path = sumstat_path, column_file_path = column_file_path, @@ -179,6 +359,8 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for } return(tmp) }) + + # All files should exist at this point (errors would have been thrown) names(sumstats) <- conditions sumstat_data$sumstats <- c(sumstat_data$sumstats, list(sumstats)) sumstat_data$LD_info <- c(sumstat_data$LD_info, list(LD_info)) diff --git a/R/file_utils.R b/R/file_utils.R index ce3253ff..135c3a61 100644 --- a/R/file_utils.R +++ b/R/file_utils.R @@ -209,7 +209,7 @@ NoPhenotypeError <- function(message) { structure(list(message = message), class = c("NoPhenotypeError", "error", "condition")) } -#' @importFrom purrr map2 compact +#' @importFrom purrr map2 #' @importFrom readr read_delim cols #' @importFrom dplyr filter select mutate across everything #' @importFrom magrittr %>% @@ -224,8 +224,7 @@ load_phenotype_data <- function(phenotype_path, region, extract_region_name = NU } # Use `map2` to iterate over `phenotype_path` and `extract_region_name` simultaneously - # `compact` should remove all NULL element - phenotype_data <- compact(map2(phenotype_path, extract_region_name, ~ { + phenotype_data <- map2(phenotype_path, extract_region_name, ~ { tabix_data <- if (!is.null(region)) tabix_region(.x, region, tabix_header = tabix_header) else read_delim(.x, "\t", col_types = cols()) if (nrow(tabix_data) == 0) { message(paste("Phenotype file ", .x, " is empty for the specified region", if (!is.null(region)) "" else region)) @@ -245,10 +244,10 @@ load_phenotype_data <- function(phenotype_path, region, extract_region_name = NU } else { return(tabix_data %>% t()) } - })) + }) - # Check if all phenotype files are empty - if (length(phenotype_data) == 0) { + # Check if all phenotype files are empty (all elements NULL) + if (all(vapply(phenotype_data, is.null, logical(1)))) { stop(NoPhenotypeError(paste("All phenotype files are empty for the specified region", if (!is.null(region)) "" else region))) } return(phenotype_data) @@ -456,6 +455,25 @@ load_regional_association_data <- function(genotype, # PLINK file ## Load phenotype and covariates and perform some pre-processing covar <- load_covariate_data(covariate) pheno <- load_phenotype_data(phenotype, region, extract_region_name = extract_region_name, region_name_col = region_name_col, tabix_header = tabix_header) + + # Keep covariates / conditions aligned with successfully loaded phenotypes. + # load_phenotype_data() returns NULL for phenotype files that are empty in + # the requested region; we must drop the corresponding covariate / condition + # entries before constructing data_list. + keep_idx <- !vapply(pheno, is.null, logical(1)) + if (!any(keep_idx)) { + stop(NoPhenotypeError(paste("All phenotype files are empty for the specified region", if (!is.null(region)) "" else region))) + } + if (any(!keep_idx)) { + dropped_conditions <- conditions[!keep_idx] + message(paste( + "Dropping phenotypes with no data in region", region, ":", + paste(dropped_conditions, collapse = ", ") + )) + } + pheno <- pheno[keep_idx] + covar <- covar[keep_idx] + conditions <- conditions[keep_idx] ### including Y ( cov ) and specific X and covar match, filter X variants based on the overlapped samples. data_list <- prepare_data_list(geno, pheno, covar, imiss_cutoff, maf_cutoff, mac_cutoff, xvar_cutoff, From 6b7c77b650c583f7cb88c2a2883ceb5ab8a710c9 Mon Sep 17 00:00:00 2001 From: Kate Lawrence Date: Mon, 29 Dec 2025 18:07:14 -0800 Subject: [PATCH 2/4] Fix variant ID normalization and add error handling - Centralize normalize_variant_id function in misc.R and remove duplicates - Normalize variant IDs consistently across sumstats, LD matrices, and genotype data - Fix LD matrix usage: use processed LD matrices after normalization in summary_stats_qc - Normalize ref_panel variant IDs before imputation to match processed sumstats - Add try-catch error handling in summary_stats_qc_multitask loop - Add try-catch error handling in separate_gwas focal analysis loop - Fix pip_cutoff_to_skip_sumstat handling for single values, named vectors, and unnamed vectors - Remove debug statements --- R/allele_qc.R | 17 ++++- R/colocboost_pipeline.R | 145 +++++++++++++++++++++++++++------------- R/file_utils.R | 43 +++++++++--- R/misc.R | 56 +++++++++++++++- R/sumstats_qc.R | 21 ++++++ R/univariate_pipeline.R | 11 ++- 6 files changed, 233 insertions(+), 60 deletions(-) diff --git a/R/allele_qc.R b/R/allele_qc.R index 7b32d0a8..9cec52ac 100644 --- a/R/allele_qc.R +++ b/R/allele_qc.R @@ -76,22 +76,29 @@ allele_qc <- function(target_data, ref_variants, col_to_flip = NULL, } else { target_data <- variant_id_to_df(target_data) } + ref_variants <- variant_id_to_df(ref_variants) + columns_to_remove <- c("chromosome", "position", "ref", "alt", "variant_id") # Check if any of the specified columns are present if (any(columns_to_remove %in% colnames(target_data))) { target_data <- select(target_data, -any_of(columns_to_remove)) } + match_result <- merge(target_data, ref_variants, by = c("chrom", "pos"), all = FALSE, suffixes = c(".target", ".ref")) %>% as.data.frame() if (nrow(match_result) == 0) { warning("No matching variants found between target data and reference variants.") return(list(target_data_qced = match_result, qc_summary = match_result)) } - # match target & ref by chrom and position + + # match target & ref by chrom and position match_result = match_result %>% mutate(variants_id_original = paste(chrom, pos, A2.target, A1.target, sep = ":")) %>% - mutate(variants_id_qced = paste(chrom, pos, A2.ref, A1.ref, sep = ":")) %>% + mutate(variants_id_qced = paste(chrom, pos, A2.ref, A1.ref, sep = ":")) + + + match_result = match_result %>% # filter out totally same rows. filter(duplicated(.) | !duplicated(.)) %>% # upper case target/reference A1 A2 @@ -175,12 +182,18 @@ allele_qc <- function(target_data, ref_variants, col_to_flip = NULL, } } + # Normalize variant_id to chr{chrom}:{pos}:{A2}:{A1} format + result$variant_id <- normalize_variant_id(result$variant_id) + if (!remove_unmatched) { match_variant <- result %>% pull(variants_id_original) match_result <- select(match_result, -(flip1.ref:keep)) %>% select(-variants_id_original, -A1.target, -A2.target) %>% rename(A1 = A1.ref, A2 = A2.ref, variant_id = variants_id_qced) + # Normalize variant_id to chr{chrom}:{pos}:{A2}:{A1} format + match_result$variant_id <- normalize_variant_id(match_result$variant_id) target_data <- target_data %>% mutate(variant_id = paste(chrom, pos, A2, A1, sep = ":")) + target_data$variant_id <- normalize_variant_id(target_data$variant_id) if (length(setdiff(target_data %>% pull(variant_id), match_variant)) > 0) { unmatch_data <- target_data %>% filter(!variant_id %in% match_variant) result <- rbind(result, unmatch_data %>% mutate(variants_id_original = variant_id)) diff --git a/R/colocboost_pipeline.R b/R/colocboost_pipeline.R index 709304b3..c80f5a79 100644 --- a/R/colocboost_pipeline.R +++ b/R/colocboost_pipeline.R @@ -683,12 +683,17 @@ colocboost_analysis_pipeline <- function(region_data, message(paste("====== Performing focaled version GWAS-xQTL ColocBoost on", length(Y), "contexts and ", current_study, "GWAS. =====")) dict <- dict_sumstatLD[i_gwas, ] traits <- c(names(Y), current_study) - res_gwas_separate[[current_study]] <- colocboost( - X = X, Y = Y, sumstat = sumstats[dict[1]], - LD = LD_mat[dict[2]], dict_YX = dict_YX, - outcome_names = traits, focal_outcome_idx = length(traits), - output_level = 2, ... - ) + res_gwas_separate[[current_study]] <- tryCatch({ + colocboost( + X = X, Y = Y, sumstat = sumstats[dict[1]], + LD = LD_mat[dict[2]], dict_YX = dict_YX, + outcome_names = traits, focal_outcome_idx = length(traits), + output_level = 2, ... + ) + }, error = function(e) { + message(paste("Error in focaled ColocBoost for", current_study, ":", conditionMessage(e))) + return(NULL) + }) } t32 <- Sys.time() analysis_results$separate_gwas <- res_gwas_separate @@ -744,9 +749,18 @@ qc_regional_data <- function(region_data, if (is.null(y)) { return(NULL) } + if (is.null(colnames(y))) { - colnames(y) <- names(res_Y)[iy] + # Fallback: if colnames (gene IDs) are missing, create numbered colnames + if (is.matrix(y) && ncol(y) > 1) { + new_colnames <- paste0(names(res_Y)[iy], "_", 1:ncol(y)) + colnames(y) <- new_colnames + warning("No colnames found for condition '", names(res_Y)[iy], "', creating numbered colnames for matrix with ", ncol(y), " columns") + } else { + colnames(y) <- names(res_Y)[iy] + } } else { + # Preserve existing colnames (gene IDs) and add condition prefix colnames(y) <- paste0(names(res_Y)[iy], "_", colnames(y)) } return(y) @@ -869,47 +883,88 @@ qc_regional_data <- function(region_data, n <- sumstat$n var_y <- sumstat$var_y conditions_sumstat <- names(sumstats)[ii] - pip_cutoff_to_skip_ld <- pip_cutoff_to_skip_sumstat[conditions_sumstat] %>% as.numeric() - - # Preprocess the input data - preprocess_results <- rss_basic_qc(sumstat$sumstats, LD_data, remove_indels = remove_indels) - sumstat$sumstats <- preprocess_results$sumstats - LD_mat <- preprocess_results$LD_mat - - # initial PIP checking - if (pip_cutoff_to_skip_ld != 0) { - pip <- susie_rss_wrapper(z = sumstat$sumstats$z, R = LD_mat, L = 1, n = n, var_y = var_y)$pip - if (pip_cutoff_to_skip_ld < 0) { - # automatically determine the cutoff to use - pip_cutoff_to_skip_ld <- 3 * 1 / nrow(LD_mat) - } - if (!any(pip > pip_cutoff_to_skip_ld)) { - message(paste( - "Skipping follow-up analysis for sumstat study", conditions_sumstat, - ". No signals above PIP threshold", pip_cutoff_to_skip_ld, "in initial model screening." - )) - next + + # Wrap processing in try-catch to handle errors for individual sumstats + result <- tryCatch({ + # Handle pip_cutoff_to_skip_sumstat: can be a single value, named vector, or unnamed vector + if (length(pip_cutoff_to_skip_sumstat) == 1 && is.null(names(pip_cutoff_to_skip_sumstat))) { + # Single value: use for all conditions + pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat) + } else if (!is.null(names(pip_cutoff_to_skip_sumstat)) && conditions_sumstat %in% names(pip_cutoff_to_skip_sumstat)) { + # Named vector: lookup by condition name + pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat[conditions_sumstat]) + } else if (length(pip_cutoff_to_skip_sumstat) >= ii) { + # Unnamed vector: use positional indexing + pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat[ii]) } else { - message(paste("Keep summary study", conditions_sumstat, ".")) + # Default to 0 if no match + pip_cutoff_to_skip_ld <- 0 + } + + # Handle NA values (e.g., from failed lookup) + if (is.na(pip_cutoff_to_skip_ld)) { + pip_cutoff_to_skip_ld <- 0 } - } - # Perform quality control - remove - if (!is.null(qc_method)) { - qc_results <- summary_stats_qc(sumstat$sumstats, LD_data, n = n, var_y = var_y, method = qc_method) - sumstat$sumstats <- qc_results$sumstats - LD_mat <- qc_results$LD_mat - } - # Perform imputation - if (impute) { - LD_matrix <- partition_LD_matrix(LD_data) - impute_results <- raiss(LD_data$ref_panel, sumstat$sumstats, LD_matrix, - rcond = impute_opts$rcond, - R2_threshold = impute_opts$R2_threshold, minimum_ld = impute_opts$minimum_ld, lamb = impute_opts$lamb - ) - sumstat$sumstats <- impute_results$result_filter - LD_mat <- impute_results$LD_mat - } + # Preprocess the input data + preprocess_results <- rss_basic_qc(sumstat$sumstats, LD_data, remove_indels = remove_indels) + sumstat$sumstats <- preprocess_results$sumstats + LD_mat <- preprocess_results$LD_mat + + # initial PIP checking + if (pip_cutoff_to_skip_ld != 0) { + pip <- susie_rss_wrapper(z = sumstat$sumstats$z, R = LD_mat, L = 1, n = n, var_y = var_y)$pip + if (pip_cutoff_to_skip_ld < 0) { + # automatically determine the cutoff to use + pip_cutoff_to_skip_ld <- 3 * 1 / nrow(LD_mat) + } + if (!any(pip > pip_cutoff_to_skip_ld)) { + message(paste( + "Skipping follow-up analysis for sumstat study", conditions_sumstat, + ". No signals above PIP threshold", pip_cutoff_to_skip_ld, "in initial model screening." + )) + return(NULL) # Skip this sumstat + } else { + message(paste("Keep summary study", conditions_sumstat, ".")) + } + } + + # Perform quality control - remove + # Create a temporary LD_data structure with the processed LD matrix for summary_stats_qc + if (!is.null(qc_method)) { + LD_data_processed <- list(combined_LD_matrix = LD_mat) + qc_results <- summary_stats_qc(sumstat$sumstats, LD_data_processed, n = n, var_y = var_y, method = qc_method) + sumstat$sumstats <- qc_results$sumstats + LD_mat <- qc_results$LD_mat + } + # Perform imputation + if (impute) { + # Normalize ref_panel variant IDs to match processed sumstats + ref_panel_processed <- LD_data$ref_panel + if (!is.null(ref_panel_processed) && "variant_id" %in% colnames(ref_panel_processed)) { + ref_panel_processed$variant_id <- normalize_variant_id(ref_panel_processed$variant_id) + } + LD_matrix <- partition_LD_matrix(LD_data) + impute_results <- raiss(ref_panel_processed, sumstat$sumstats, LD_matrix, + rcond = impute_opts$rcond, + R2_threshold = impute_opts$R2_threshold, minimum_ld = impute_opts$minimum_ld, lamb = impute_opts$lamb + ) + sumstat$sumstats <- impute_results$result_filter + LD_mat <- impute_results$LD_mat + } + + # Return processed sumstat and LD_mat + return(list(sumstat = sumstat, LD_mat = LD_mat)) + }, error = function(e) { + message(paste("Error processing sumstat", conditions_sumstat, ":", conditionMessage(e))) + return(NULL) + }) + + # Skip if processing failed + if (is.null(result)) next + + sumstat <- result$sumstat + LD_mat <- result$LD_mat # - check if LD exist if (length(final_LD) == 0) { diff --git a/R/file_utils.R b/R/file_utils.R index 135c3a61..e40560c6 100644 --- a/R/file_utils.R +++ b/R/file_utils.R @@ -230,20 +230,29 @@ load_phenotype_data <- function(phenotype_path, region, extract_region_name = NU message(paste("Phenotype file ", .x, " is empty for the specified region", if (!is.null(region)) "" else region)) return(NULL) } - if (!is.null(.y) && is.vector(.y) && !is.null(region_name_col) && (region_name_col %% 1 == 0)) { + # Transpose phenotype data: rows become columns (samples), columns become rows (genes + metadata) + tabix_data_transposed <- if (!is.null(.y) && is.vector(.y) && !is.null(region_name_col) && (region_name_col %% 1 == 0)) { + # Filter by extract_region_name if provided if (region_name_col <= ncol(tabix_data)) { region_col_name <- colnames(tabix_data)[region_name_col] - tabix_data <- tabix_data %>% + tabix_data %>% filter(.data[[region_col_name]] %in% .y) %>% t() - colnames(tabix_data) <- tabix_data[region_name_col, ] - return(tabix_data) } else { stop("region_name_col is out of bounds for the number of columns in tabix_data.") } } else { - return(tabix_data %>% t()) + # No filtering, just transpose + tabix_data %>% t() } + + # Set colnames from region_name_col row (contains gene IDs) if region_name_col is provided + # After transposing, the gene ID column becomes a row at position region_name_col + if (!is.null(region_name_col) && (region_name_col %% 1 == 0) && region_name_col <= nrow(tabix_data_transposed)) { + colnames(tabix_data_transposed) <- tabix_data_transposed[region_name_col, ] + } + + return(tabix_data_transposed) }) # Check if all phenotype files are empty (all elements NULL) @@ -310,6 +319,7 @@ prepare_data_list <- function(geno_bed, phenotype, covariate, imiss_cutoff, maf_ maf_val <- max(maf_cutoff, mac_val) filtered_data <- filter_X(filtered_geno_bed, imiss_cutoff, maf_val, var_thresh = xvar_cutoff) colnames(filtered_data) <- format_variant_id(colnames(filtered_data)) # Format column names right after filtering + colnames(filtered_data) <- normalize_variant_id(colnames(filtered_data)) # Normalize to chr{chrom}:{pos}:{A2}:{A1} format filtered_data }) ) %>% @@ -335,6 +345,8 @@ prepare_X_matrix <- function(geno_bed, data_list, imiss_cutoff, maf_cutoff, mac_ # Apply further filtering on X X_filtered <- filter_X(X_filtered, imiss_cutoff, maf_val, xvar_cutoff) colnames(X_filtered) <- format_variant_id(colnames(X_filtered)) + # Normalize variant IDs to chr{chrom}:{pos}:{A2}:{A1} format + colnames(X_filtered) <- normalize_variant_id(colnames(X_filtered)) # To keep a log message variants <- as.data.frame(do.call(rbind, lapply(format_variant_id(colnames(X_filtered)), function(x) strsplit(x, ":")[[1]][1:2])), stringsAsFactors = FALSE) @@ -373,17 +385,30 @@ add_X_residuals <- function(data_list, scale_residuals = FALSE) { #' @noRd add_Y_residuals <- function(data_list, conditions, scale_residuals = FALSE) { # Compute residuals, their mean, and standard deviation, and add them to data_list + # Preserve colnames from original Y (gene IDs) through the residual computation data_list <- data_list %>% mutate( - lm_res = map2(Y, covar, ~ .lm.fit(x = cbind(1, .y), y = .x)$residuals %>% as.matrix()), + lm_res = map2(Y, covar, function(y, cov) { + res <- .lm.fit(x = cbind(1, cov), y = y)$residuals %>% as.matrix() + # Preserve colnames from original Y (gene IDs) + if (!is.null(colnames(y))) { + colnames(res) <- colnames(y) + } + return(res) + }), Y_resid_mean = map(lm_res, ~ apply(.x, 2, mean)), Y_resid_sd = map(lm_res, ~ apply(.x, 2, sd)), - Y_resid = map(lm_res, ~ { + Y_resid = map2(lm_res, Y, function(lm, y_orig) { if (scale_residuals) { - scale(.x) + res <- scale(lm) } else { - .x + res <- lm + } + # Preserve colnames from original Y (gene IDs) + if (!is.null(colnames(y_orig))) { + colnames(res) <- colnames(y_orig) } + return(res) }) ) diff --git a/R/misc.R b/R/misc.R index f4f18477..6947be91 100644 --- a/R/misc.R +++ b/R/misc.R @@ -238,6 +238,35 @@ format_variant_id <- function(names_vector) { gsub("_", ":", names_vector) } +# Normalize variant ID to format: chr{chrom}:{pos}:{A2}:{A1} +# Strips "chr" if present, removes build suffix, then adds "chr" prefix +normalize_variant_id <- function(variant_id) { + # Remove "chr" prefix if present + variant_id <- gsub("^chr", "", variant_id) + # Split by colon to check format + parts <- strsplit(variant_id, ":", fixed = TRUE) + # Reconstruct with "chr" prefix: chr{chrom}:{pos}:{A2}:{A1} + normalized <- sapply(parts, function(p) { + n_parts <- length(p) + if (n_parts >= 4) { + # Has 4+ parts: chrom:pos:A2:A1:build_suffix + # Remove build suffix (5th part onwards) and keep first 4 + paste0("chr", p[1], ":", p[2], ":", p[3], ":", p[4]) + } else if (n_parts == 3) { + # Has 3 parts: chrom:pos:A1 (missing A2) - this is an error case + # Return as-is with chr prefix but warn + paste0("chr", paste(p, collapse = ":")) + } else if (n_parts == 2) { + # Has 2 parts: chrom:pos (missing alleles) - this is an error case + paste0("chr", paste(p, collapse = ":")) + } else { + # Unexpected format, return as-is with chr prefix + paste0("chr", paste(p, collapse = ":")) + } + }) + return(normalized) +} + #' Converted Variant ID into a properly structured data frame #' @param variant_id A data frame or character vector representing variant IDs. #' Expected formats are a data frame with columns "chrom", "pos", "A1", "A2", @@ -263,8 +292,31 @@ variant_id_to_df <- function(variant_id) { create_dataframe <- function(string) { string <- gsub("_", ":", string) parts <- strsplit(string, ":", fixed = TRUE) - data <- data.frame(do.call(rbind, parts), stringsAsFactors = FALSE) - colnames(data) <- c("chrom", "pos", "A2", "A1") + # Check how many parts each variant ID has + n_parts <- sapply(parts, length) + n_parts_unique <- unique(n_parts) + + # Handle both 4-part (chr:pos:A2:A1) and 5-part (chr:pos:A2:A1:build) formats + if (all(n_parts == 4)) { + # All have 4 parts + data <- data.frame(do.call(rbind, parts), stringsAsFactors = FALSE) + colnames(data) <- c("chrom", "pos", "A2", "A1") + } else if (all(n_parts == 5)) { + # All have 5 parts - drop the build column + data <- data.frame(do.call(rbind, parts), stringsAsFactors = FALSE) + colnames(data) <- c("chrom", "pos", "A2", "A1", "build") + data <- data[, c("chrom", "pos", "A2", "A1"), drop = FALSE] + } else if (all(n_parts %in% c(4, 5))) { + # Mixed 4 and 5 parts - pad 4-part IDs with empty build, then drop build column + parts_padded <- lapply(parts, function(p) { + if (length(p) == 4) c(p, "") else p + }) + data <- data.frame(do.call(rbind, parts_padded), stringsAsFactors = FALSE) + colnames(data) <- c("chrom", "pos", "A2", "A1", "build") + data <- data[, c("chrom", "pos", "A2", "A1"), drop = FALSE] + } else { + stop("Variant IDs must have 4 or 5 parts (chr:pos:A2:A1 or chr:pos:A2:A1:build), but found parts: ", paste(n_parts_unique, collapse=", ")) + } # Ensure that 'chrom' values are integers data$chrom <- ifelse(grepl("^chr", data$chrom), as.integer(sub("^chr", "", data$chrom)), # Remove 'chr' and convert to integer diff --git a/R/sumstats_qc.R b/R/sumstats_qc.R index c0424fe3..d0b29c17 100644 --- a/R/sumstats_qc.R +++ b/R/sumstats_qc.R @@ -18,6 +18,7 @@ #' @importFrom tidyr separate #' @importFrom magrittr %>% #' @export + rss_basic_qc <- function(sumstats, LD_data, skip_region = NULL, remove_indels = FALSE) { # Check if required columns are present in sumstats required_cols <- c("chrom", "pos", "A1", "A2") @@ -54,6 +55,26 @@ rss_basic_qc <- function(sumstats, LD_data, skip_region = NULL, remove_indels = sumstats_processed <- allele_flip$target_data_qced %>% arrange(pos) + # Normalize variant IDs to chr{chrom}:{pos}:{A2}:{A1} format + sumstats_processed$variant_id <- normalize_variant_id(sumstats_processed$variant_id) + + # Normalize LD matrix rownames/colnames + ld_rownames <- rownames(LD_data$combined_LD_matrix) + ld_colnames <- colnames(LD_data$combined_LD_matrix) + if (!is.null(ld_rownames)) { + ld_rownames_normalized <- normalize_variant_id(ld_rownames) + rownames(LD_data$combined_LD_matrix) <- ld_rownames_normalized + } + if (!is.null(ld_colnames)) { + ld_colnames_normalized <- normalize_variant_id(ld_colnames) + colnames(LD_data$combined_LD_matrix) <- ld_colnames_normalized + } + + # Update combined_LD_variants to match + if (!is.null(LD_data$combined_LD_variants)) { + LD_data$combined_LD_variants <- normalize_variant_id(LD_data$combined_LD_variants) + } + LD_mat_processed <- LD_data$combined_LD_matrix[sumstats_processed$variant_id, sumstats_processed$variant_id, drop = FALSE] return(list(sumstats = sumstats_processed, LD_mat = LD_mat_processed)) diff --git a/R/univariate_pipeline.R b/R/univariate_pipeline.R index 7a577c0d..1cc58c6d 100644 --- a/R/univariate_pipeline.R +++ b/R/univariate_pipeline.R @@ -224,15 +224,22 @@ rss_analysis_pipeline <- function( } } # Perform quality control + # Create a temporary LD_data structure with the processed LD matrix for summary_stats_qc if (!is.null(qc_method)) { - qc_results <- summary_stats_qc(sumstats, LD_data, n = n, var_y = var_y, method = qc_method) + LD_data_processed <- list(combined_LD_matrix = LD_mat) + qc_results <- summary_stats_qc(sumstats, LD_data_processed, n = n, var_y = var_y, method = qc_method) sumstats <- qc_results$sumstats LD_mat <- qc_results$LD_mat } # Perform imputation if (impute) { + # Normalize ref_panel variant IDs to match processed sumstats + ref_panel_processed <- LD_data$ref_panel + if (!is.null(ref_panel_processed) && "variant_id" %in% colnames(ref_panel_processed)) { + ref_panel_processed$variant_id <- normalize_variant_id(ref_panel_processed$variant_id) + } LD_matrix <- partition_LD_matrix(LD_data) - impute_results <- raiss(LD_data$ref_panel, sumstats, LD_matrix, rcond = impute_opts$rcond, R2_threshold = impute_opts$R2_threshold, minimum_ld = impute_opts$minimum_ld, lamb = impute_opts$lamb) + impute_results <- raiss(ref_panel_processed, sumstats, LD_matrix, rcond = impute_opts$rcond, R2_threshold = impute_opts$R2_threshold, minimum_ld = impute_opts$minimum_ld, lamb = impute_opts$lamb) sumstats <- impute_results$result_filter LD_mat <- impute_results$LD_mat } From 3a8cd01832ab006261689820932b8f0c4040cd7c Mon Sep 17 00:00:00 2001 From: Kate Lawrence Date: Tue, 30 Dec 2025 10:04:34 -0800 Subject: [PATCH 3/4] Fix variant ID normalization and add error handling for colocboost analyses - Use normalize_variant_id() consistently for sumstats and LD matrices to handle variant ID format mismatches - Remove double 'chr' prefix issue and build suffix (e.g., :b38) mismatches - Add tryCatch error handling for joint_gwas and separate_gwas analyses - Add concise warning messages for colocboost validation failures - Fix sumstat_studies handling when sumstats list is empty --- R/colocboost_pipeline.R | 166 +++++++++++++++++++++++++--------------- 1 file changed, 106 insertions(+), 60 deletions(-) diff --git a/R/colocboost_pipeline.R b/R/colocboost_pipeline.R index c80f5a79..6f4c4c5a 100644 --- a/R/colocboost_pipeline.R +++ b/R/colocboost_pipeline.R @@ -514,14 +514,19 @@ colocboost_analysis_pipeline <- function(region_data, message("No individual data pass QC.") } if (!is.null(sumstat_data)) { - phenotypes$sumstat_studies <- names(sumstat_data$sumstats) + if (!is.null(sumstat_data$sumstats) && length(sumstat_data$sumstats) > 0) { + phenotypes$sumstat_studies <- names(sumstat_data$sumstats) + } else { + phenotypes$sumstat_studies <- character(0) + } sumstat_studies_init <- phenotypes_init$sumstat_studies if (length(sumstat_studies_init) == length(phenotypes$sumstat_studies)) { message("All sumstat studies pass QC steps.") } else { + skipped <- setdiff(sumstat_studies_init, phenotypes$sumstat_studies) message(paste( "Skipping follow-up analysis for sumstat studies", - paste(setdiff(sumstat_studies_init, phenotypes$sumstat_studies), collapse = ";"), "after QC." + paste(skipped, collapse = ";"), "after QC." )) } } else { @@ -618,13 +623,18 @@ colocboost_analysis_pipeline <- function(region_data, if (!is.null(sumstat_data$sumstats)) { sumstats <- lapply(sumstat_data$sumstats, function(ss) { z <- ss$sumstats$z - variant <- paste0("chr", ss$sumstats$variant_id) + # Normalize variant IDs to ensure consistent format: chr{chrom}:{pos}:{A2}:{A1} + # This handles cases where variant_id might not be normalized or has build suffixes + variant <- normalize_variant_id(ss$sumstats$variant_id) n <- ss$n data.frame("z" = z, "n" = n, "variant" = variant) }) names(sumstats) <- names(sumstat_data$sumstats) LD_mat <- lapply(sumstat_data$LD_mat, function(ld) { - colnames(ld) <- rownames(ld) <- paste0("chr", colnames(ld)) + # Normalize LD matrix colnames/rownames to remove build suffixes (e.g., :b38) + # and ensure consistent format: chr{chrom}:{pos}:{A2}:{A1} + ld_colnames <- normalize_variant_id(colnames(ld)) + colnames(ld) <- rownames(ld) <- ld_colnames return(ld) }) LD_match <- sumstat_data$LD_match @@ -664,12 +674,17 @@ colocboost_analysis_pipeline <- function(region_data, message(paste("====== Performing non-focaled version GWAS-xQTL ColocBoost on", length(Y), "contexts and", length(sumstats), "GWAS. =====")) t21 <- Sys.time() traits <- c(names(Y), names(sumstats)) - res_gwas <- colocboost( - X = X, Y = Y, sumstat = sumstats, LD = LD_mat, - dict_YX = dict_YX, dict_sumstatLD = dict_sumstatLD, - outcome_names = traits, focal_outcome_idx = NULL, - output_level = 2, ... - ) + res_gwas <- tryCatch({ + colocboost( + X = X, Y = Y, sumstat = sumstats, LD = LD_mat, + dict_YX = dict_YX, dict_sumstatLD = dict_sumstatLD, + outcome_names = traits, focal_outcome_idx = NULL, + output_level = 2, ... + ) + }, error = function(e) { + warning(paste("Error in joint GWAS ColocBoost:", conditionMessage(e))) + return(NULL) + }) t22 <- Sys.time() analysis_results$joint_gwas <- res_gwas analysis_results$computing_time$Analysis$joint_gwas <- t22 - t21 @@ -691,9 +706,12 @@ colocboost_analysis_pipeline <- function(region_data, output_level = 2, ... ) }, error = function(e) { - message(paste("Error in focaled ColocBoost for", current_study, ":", conditionMessage(e))) + warning(paste("Error in focaled ColocBoost for", current_study, ":", conditionMessage(e))) return(NULL) }) + if (is.null(res_gwas_separate[[current_study]])) { + warning(paste("ColocBoost returned NULL for", current_study, "- likely due to data validation failure")) + } } t32 <- Sys.time() analysis_results$separate_gwas <- res_gwas_separate @@ -872,61 +890,76 @@ qc_regional_data <- function(region_data, impute = TRUE, impute_opts = list(rcond = 0.01, R2_threshold = 0.6, minimum_ld = 5, lamb = 0.01)) { n_LD <- length(sumstat_data$LD_info) - final_sumstats <- final_LD <- NULL + final_sumstats <- list() + final_LD <- list() LD_match <- c() for (i in 1:n_LD) { LD_data <- sumstat_data$LD_info[[i]] sumstats <- sumstat_data$sumstats[[i]] for (ii in 1:length(sumstats)) { sumstat <- sumstats[[ii]] - if (nrow(sumstat$sumstats) == 0) next + conditions_sumstat <- names(sumstats)[ii] + if (is.null(conditions_sumstat) || is.na(conditions_sumstat) || conditions_sumstat == "") { + next + } + if (nrow(sumstat$sumstats) == 0) { + next + } n <- sumstat$n var_y <- sumstat$var_y - conditions_sumstat <- names(sumstats)[ii] - # Wrap processing in try-catch to handle errors for individual sumstats - result <- tryCatch({ - # Handle pip_cutoff_to_skip_sumstat: can be a single value, named vector, or unnamed vector - if (length(pip_cutoff_to_skip_sumstat) == 1 && is.null(names(pip_cutoff_to_skip_sumstat))) { - # Single value: use for all conditions - pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat) - } else if (!is.null(names(pip_cutoff_to_skip_sumstat)) && conditions_sumstat %in% names(pip_cutoff_to_skip_sumstat)) { - # Named vector: lookup by condition name - pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat[conditions_sumstat]) - } else if (length(pip_cutoff_to_skip_sumstat) >= ii) { - # Unnamed vector: use positional indexing - pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat[ii]) - } else { - # Default to 0 if no match - pip_cutoff_to_skip_ld <- 0 - } - - # Handle NA values (e.g., from failed lookup) - if (is.na(pip_cutoff_to_skip_ld)) { - pip_cutoff_to_skip_ld <- 0 - } + # Handle pip_cutoff_to_skip_sumstat: can be a single value, named vector, or unnamed vector + if (length(pip_cutoff_to_skip_sumstat) == 1 && is.null(names(pip_cutoff_to_skip_sumstat))) { + # Single value: use for all conditions + pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat) + } else if (!is.null(names(pip_cutoff_to_skip_sumstat)) && conditions_sumstat %in% names(pip_cutoff_to_skip_sumstat)) { + # Named vector: lookup by condition name + pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat[conditions_sumstat]) + } else if (length(pip_cutoff_to_skip_sumstat) >= ii) { + # Unnamed vector: use positional indexing + pip_cutoff_to_skip_ld <- as.numeric(pip_cutoff_to_skip_sumstat[ii]) + } else { + # Default to 0 if no match + pip_cutoff_to_skip_ld <- 0 + } + + # Handle NA values (e.g., from failed lookup) + if (is.na(pip_cutoff_to_skip_ld)) { + pip_cutoff_to_skip_ld <- 0 + } - # Preprocess the input data - preprocess_results <- rss_basic_qc(sumstat$sumstats, LD_data, remove_indels = remove_indels) - sumstat$sumstats <- preprocess_results$sumstats - LD_mat <- preprocess_results$LD_mat - - # initial PIP checking - if (pip_cutoff_to_skip_ld != 0) { - pip <- susie_rss_wrapper(z = sumstat$sumstats$z, R = LD_mat, L = 1, n = n, var_y = var_y)$pip - if (pip_cutoff_to_skip_ld < 0) { - # automatically determine the cutoff to use - pip_cutoff_to_skip_ld <- 3 * 1 / nrow(LD_mat) - } + # Preprocess the input data + preprocess_results <- rss_basic_qc(sumstat$sumstats, LD_data, remove_indels = remove_indels) + sumstat$sumstats <- preprocess_results$sumstats + LD_mat <- preprocess_results$LD_mat + + # initial PIP checking + if (pip_cutoff_to_skip_ld != 0) { + pip <- susie_rss_wrapper(z = sumstat$sumstats$z, R = LD_mat, L = 1, n = n, var_y = var_y)$pip + if (pip_cutoff_to_skip_ld < 0) { + # automatically determine the cutoff to use + pip_cutoff_to_skip_ld <- 3 * 1 / nrow(LD_mat) + } if (!any(pip > pip_cutoff_to_skip_ld)) { message(paste( "Skipping follow-up analysis for sumstat study", conditions_sumstat, ". No signals above PIP threshold", pip_cutoff_to_skip_ld, "in initial model screening." )) - return(NULL) # Skip this sumstat - } else { - message(paste("Keep summary study", conditions_sumstat, ".")) - } + result <- NULL # Skip this sumstat + } else { + message(paste("Keep summary study", conditions_sumstat, ".")) + result <- list(sumstat = sumstat, LD_mat = LD_mat) + } + } else { + # Continue processing if pip_cutoff is 0 + result <- NULL # Will be set below after processing + } + + # Only continue if not skipped by PIP check + if (!is.null(result) || pip_cutoff_to_skip_ld == 0) { + if (is.null(result)) { + # Need to create result structure + result <- list(sumstat = sumstat, LD_mat = LD_mat) } # Perform quality control - remove @@ -936,9 +969,15 @@ qc_regional_data <- function(region_data, qc_results <- summary_stats_qc(sumstat$sumstats, LD_data_processed, n = n, var_y = var_y, method = qc_method) sumstat$sumstats <- qc_results$sumstats LD_mat <- qc_results$LD_mat + result$sumstat <- sumstat + result$LD_mat <- LD_mat + if (nrow(sumstat$sumstats) == 0) { + result <- NULL + } } + # Perform imputation - if (impute) { + if (impute && !is.null(result)) { # Normalize ref_panel variant IDs to match processed sumstats ref_panel_processed <- LD_data$ref_panel if (!is.null(ref_panel_processed) && "variant_id" %in% colnames(ref_panel_processed)) { @@ -951,17 +990,23 @@ qc_regional_data <- function(region_data, ) sumstat$sumstats <- impute_results$result_filter LD_mat <- impute_results$LD_mat + result$sumstat <- sumstat + result$LD_mat <- LD_mat + if (nrow(sumstat$sumstats) == 0) { + result <- NULL + } } - - # Return processed sumstat and LD_mat - return(list(sumstat = sumstat, LD_mat = LD_mat)) - }, error = function(e) { - message(paste("Error processing sumstat", conditions_sumstat, ":", conditionMessage(e))) - return(NULL) - }) + } # Skip if processing failed - if (is.null(result)) next + if (is.null(result)) { + next + } + + # Check result structure + if (!is.list(result) || !("sumstat" %in% names(result)) || !("LD_mat" %in% names(result))) { + next + } sumstat <- result$sumstat LD_mat <- result$LD_mat @@ -1006,3 +1051,4 @@ qc_regional_data <- function(region_data, sumstat_data = sumstat_data )) } + From 022e3be2867e206eff7c7cfef1c9d8462aa10d1d Mon Sep 17 00:00:00 2001 From: Kate Lawrence Date: Tue, 13 Jan 2026 12:35:47 -0800 Subject: [PATCH 4/4] add num_threads control to colocboost pipeline --- R/LD.R | 14 ++-- R/colocboost_pipeline.R | 49 +++++++++++--- R/dentist_qc.R | 1 + R/file_utils.R | 111 +++++++++++++++++-------------- R/regularized_regression.R | 3 +- R/susie_wrapper.R | 11 +-- src/dentist_iterative_impute.cpp | 36 ++++++++++ src/mr_ash.h | 20 ++++++ 8 files changed, 173 insertions(+), 72 deletions(-) diff --git a/R/LD.R b/R/LD.R index 426790ff..7d8a109c 100644 --- a/R/LD.R +++ b/R/LD.R @@ -106,8 +106,8 @@ extract_file_paths <- function(genomic_data, intersection_rows, column_to_extrac #' @importFrom dplyr select #' @importFrom vroom vroom #' @noRd -get_regional_ld_meta <- function(ld_reference_meta_file, region, complete_coverage_required = FALSE) { - genomic_data <- vroom(ld_reference_meta_file) +get_regional_ld_meta <- function(ld_reference_meta_file, region, complete_coverage_required = FALSE, num_threads = 1) { + genomic_data <- vroom(ld_reference_meta_file, num_threads = num_threads) region <- parse_region(region) # Set column names names(genomic_data) <- c("chrom", "start", "end", "path") @@ -318,9 +318,9 @@ create_combined_LD_matrix <- function(LD_matrices, variants) { #' further partitioning if needed.} #' } #' @export -load_LD_matrix <- function(LD_meta_file_path, region, extract_coordinates = NULL) { +load_LD_matrix <- function(LD_meta_file_path, region, extract_coordinates = NULL, num_threads = 1) { # Intersect LD metadata with specified regions using updated function - intersected_LD_files <- get_regional_ld_meta(LD_meta_file_path, region) + intersected_LD_files <- get_regional_ld_meta(LD_meta_file_path, region, num_threads = num_threads) # Extract file paths for LD and bim files LD_file_paths <- intersected_LD_files$intersections$LD_file_paths @@ -403,7 +403,7 @@ load_LD_matrix <- function(LD_meta_file_path, region, extract_coordinates = NULL #' @importFrom dplyr select group_by summarise #' @importFrom vroom vroom #' @export -filter_variants_by_ld_reference <- function(variant_ids, ld_reference_meta_file, keep_indel = TRUE) { +filter_variants_by_ld_reference <- function(variant_ids, ld_reference_meta_file, keep_indel = TRUE, num_threads = 1) { # Step 1: Process variant IDs into a data frame and filter out non-standard nucleotides variants_df <- do.call(rbind, lapply(strsplit(variant_ids, ":"), function(x) { data.frame(chrom = x[1], pos = as.integer(x[2]), ref = x[3], alt = x[4]) @@ -418,11 +418,11 @@ filter_variants_by_ld_reference <- function(variant_ids, ld_reference_meta_file, group_by(chrom) %>% summarise(start = min(pos), end = max(pos)) # Step 3: Call get_regional_ld_meta to get bim_file_paths - bim_file_paths <- get_regional_ld_meta(ld_reference_meta_file, region_df)$intersections$bim_file_paths + bim_file_paths <- get_regional_ld_meta(ld_reference_meta_file, region_df, num_threads = num_threads)$intersections$bim_file_paths # Step 4: Load bim files and consolidate into a single data frame bim_data <- lapply(bim_file_paths, function(path) { - bim_df <- vroom(path, col_names = FALSE) + bim_df <- vroom(path, col_names = FALSE, num_threads = num_threads) data.frame(chrom = bim_df$X1, pos = bim_df$X4, stringsAsFactors = FALSE) }) %>% do.call("rbind", .) diff --git a/R/colocboost_pipeline.R b/R/colocboost_pipeline.R index 6f4c4c5a..3d8414b7 100644 --- a/R/colocboost_pipeline.R +++ b/R/colocboost_pipeline.R @@ -35,6 +35,7 @@ #' @param sumstats_region_name_col Filter this specific column for the extract_sumstats_region_name. #' @param comment_string comment sign in the column_mapping file, default is # #' @param extract_coordinates Optional data frame with columns "chrom" and "pos" for specific coordinates extraction. +#' @param num_threads Number of threads to use. Default to use OMP_NUM_THREADS if set, otherwise use parallel::detectCores(). #' #' @return A list containing the individual_data and sumstat_data: #' individual_data contains the following components if exist @@ -91,7 +92,21 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for extract_sumstats_region_name = NULL, sumstats_region_name_col = NULL, comment_string = "#", - extract_coordinates = NULL) { + extract_coordinates = NULL, + num_threads = NULL) { + # Auto-detect number of cores if num_threads not specified + if (is.null(num_threads)) { + omp_threads <- Sys.getenv("OMP_NUM_THREADS") + if (omp_threads != "") { + num_threads <- as.integer(omp_threads) + message("num_threads not specified, using OMP_NUM_THREADS=", num_threads) + } else { + num_threads <- parallel::detectCores() + message("num_threads not specified, auto-detected ", num_threads, " cores") + } + } + message("num_threads is set to: ", num_threads) + if (is.null(genotype_list) & is.null(sumstat_path_list)) { stop("Data load error. Please make sure at least one data set (sumstat_path_list or genotype_list) exists.") } @@ -169,7 +184,7 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for # Helper function to check if phenotype file has data for the specific region # Note: The file may have data elsewhere, but we only care about this region # This is a pre-check; the actual loader will do the definitive check - check_phenotype_has_data <- function(pheno_file, region_str) { + check_phenotype_has_data <- function(pheno_file, region_str, num_threads = 1) { # File existence already checked above, so we can assume it exists here # Parse region to get chromosome and coordinates region_parts <- strsplit(region_str, ":", fixed = TRUE)[[1]] @@ -198,7 +213,7 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for cmd_output <- tryCatch( { data.table::fread(cmd = paste0("tabix -h ", pheno_file, " ", region_tabix), - sep = "auto", header = "auto") + sep = "auto", header = "auto", nThread = num_threads) }, error = function(e) NULL ) @@ -220,7 +235,7 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for } # Filter to only phenotypes with data for this specific region - has_data <- vapply(phenotype, function(p) check_phenotype_has_data(p, region), logical(1)) + has_data <- vapply(phenotype, function(p) check_phenotype_has_data(p, region, num_threads = num_threads), logical(1)) valid_indices <- which(has_data) if (length(valid_indices) == 0) { @@ -254,7 +269,8 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for extract_region_name = extract_region_name, phenotype_header = phenotype_header, region_name_col = region_name_col, - scale_residuals = scale_residuals + scale_residuals = scale_residuals, + num_threads = num_threads ) if (is.null(individual_data)) { @@ -329,7 +345,8 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for LD_info <- load_LD_matrix(LD_meta_file_path, region = association_window, - extract_coordinates = extract_coordinates + extract_coordinates = extract_coordinates, + num_threads = num_threads ) # extract sumstat information conditions <- match_LD_sumstat[[i_ld]] @@ -350,7 +367,8 @@ load_multitask_regional_data <- function(region, # a string of chr:start-end for sumstat_path = sumstat_path, column_file_path = column_file_path, n_sample = n_samples[ii], n_case = n_cases[ii], n_control = n_controls[ii], region = association_window, extract_region_name = extract_sumstats_region_name, - region_name_col = sumstats_region_name_col, comment_string = comment_string + region_name_col = sumstats_region_name_col, comment_string = comment_string, + num_threads = num_threads ) if (!("variant_id" %in% colnames(tmp$sumstats))) { tmp$sumstats <- tmp$sumstats %>% @@ -579,6 +597,7 @@ colocboost_analysis_pipeline <- function(region_data, ####### ========= QC for the region_data ======== ######## t01 <- Sys.time() + region_data <- qc_regional_data(region_data, maf_cutoff = maf_cutoff, pip_cutoff_to_skip_ind = pip_cutoff_to_skip_ind, @@ -588,6 +607,7 @@ colocboost_analysis_pipeline <- function(region_data, impute = impute, impute_opts = impute_opts ) + phenotypes_QC <- extract_contexts_studies(region_data, phenotypes_init = phenotypes_init) t02 <- Sys.time() analysis_results$computing_time$QC <- t02 - t01 @@ -682,10 +702,13 @@ colocboost_analysis_pipeline <- function(region_data, output_level = 2, ... ) }, error = function(e) { - warning(paste("Error in joint GWAS ColocBoost:", conditionMessage(e))) + message(paste("Error in joint GWAS ColocBoost:", conditionMessage(e))) return(NULL) }) t22 <- Sys.time() + if (is.null(res_gwas)) { + message("Joint GWAS ColocBoost returned NULL - no results stored") + } analysis_results$joint_gwas <- res_gwas analysis_results$computing_time$Analysis$joint_gwas <- t22 - t21 } @@ -706,11 +729,11 @@ colocboost_analysis_pipeline <- function(region_data, output_level = 2, ... ) }, error = function(e) { - warning(paste("Error in focaled ColocBoost for", current_study, ":", conditionMessage(e))) + message(paste("Error in focaled ColocBoost for", current_study, ":", conditionMessage(e))) return(NULL) }) if (is.null(res_gwas_separate[[current_study]])) { - warning(paste("ColocBoost returned NULL for", current_study, "- likely due to data validation failure")) + message(paste("ColocBoost returned NULL for", current_study, "- likely due to data validation failure")) } } t32 <- Sys.time() @@ -795,7 +818,11 @@ qc_regional_data <- function(region_data, # automatically determine the cutoff to use pip_cutoff <- 3 * 1 / ncol(res_X) } - top_model_pip <- lapply(1:ncol(res_Y), function(i) susieR::susie(res_X, res_Y[, i], L = 1)$pip) + + top_model_pip <- lapply(1:ncol(res_Y), function(i) { + susieR::susie(res_X, res_Y[, i], L = 1)$pip + }) + check_model_pip <- sapply(top_model_pip, function(pip) any(pip > pip_cutoff)) include_idx <- which(check_model_pip) if (length(include_idx) == 0) { diff --git a/R/dentist_qc.R b/R/dentist_qc.R index 984aa795..6e2f4f19 100644 --- a/R/dentist_qc.R +++ b/R/dentist_qc.R @@ -220,6 +220,7 @@ dentist_single_window <- function(zScore, LD_mat, nSample, }, warning = warning_handler ) + res <- as.data.frame(res) # Recover dups if (duprThreshold < 1.0) { diff --git a/R/file_utils.R b/R/file_utils.R index e40560c6..2ae6ded9 100644 --- a/R/file_utils.R +++ b/R/file_utils.R @@ -2,9 +2,9 @@ #' @importFrom dplyr rename #' @importFrom data.table fread #' @importFrom tools file_path_sans_ext -read_pvar <- function(pgen) { +read_pvar <- function(pgen, num_threads = 1) { pvarf <- paste0(file_path_sans_ext(pgen), ".pvar") - pvardt <- fread(pvarf, skip = "#CHROM") + pvardt <- fread(pvarf, skip = "#CHROM", nThread = num_threads) pvardt <- rename(pvardt, "chrom" = "#CHROM", "pos" = "POS", "alt" = "ALT", "ref" = "REF", "id" = "ID" @@ -15,27 +15,27 @@ read_pvar <- function(pgen) { #' @importFrom vroom vroom #' @importFrom tools file_path_sans_ext -read_bim <- function(bed) { +read_bim <- function(bed, num_threads = 1) { bimf <- paste0(file_path_sans_ext(bed), ".bim") - bim <- vroom(bimf, col_names = FALSE) + bim <- vroom(bimf, col_names = FALSE, num_threads = num_threads) colnames(bim) <- c("chrom", "id", "gpos", "pos", "a1", "a0") return(bim) } #' @importFrom vroom vroom #' @importFrom tools file_path_sans_ext -read_psam <- function(pgen) { +read_psam <- function(pgen, num_threads = 1) { psamf <- paste0(file_path_sans_ext(pgen), ".psam") - psam <- vroom(psamf) + psam <- vroom(psamf, num_threads = num_threads) colnames(psam)[1:2] <- c("FID", "IID") return(psam) } #' @importFrom vroom vroom #' @importFrom tools file_path_sans_ext -read_fam <- function(bed) { +read_fam <- function(bed, num_threads = 1) { famf <- paste0(file_path_sans_ext(bed), ".fam") - return(vroom(famf, col_names = FALSE)) + return(vroom(famf, col_names = FALSE, num_threads = num_threads)) } # open pgen/pvar PLINK 2 data format @@ -48,12 +48,12 @@ open_pgen <- function(pgenf) { } # open bed/bim/fam: A PLINK 1 .bed is a valid .pgen -open_bed <- function(bed) { +open_bed <- function(bed, num_threads = 1) { # Make sure pgenlibr is installed if (!requireNamespace("pgenlibr", quietly = TRUE)) { stop("To use this function, please install pgenlibr: https://cran.r-project.org/web/packages/pgenlibr/index.html") } - raw_s_ct <- nrow(read_fam(bed)) + raw_s_ct <- nrow(read_fam(bed, num_threads = num_threads)) return(pgenlibr::NewPgen(bed, raw_sample_ct = raw_s_ct)) } @@ -78,10 +78,10 @@ read_pgen <- function(pgen, variantidx = NULL, meanimpute = F) { #' @importFrom magrittr %>% #' @importFrom stringr str_detect -tabix_region <- function(file, region, tabix_header = "auto", target = "", target_column_index = "") { +tabix_region <- function(file, region, tabix_header = "auto", target = "", target_column_index = "", num_threads = 1) { cmd_output <- tryCatch( { - fread(cmd = paste0("tabix -h ", file, " ", region), sep = "auto", header = tabix_header) + fread(cmd = paste0("tabix -h ", file, " ", region), sep = "auto", header = tabix_header, nThread = num_threads) }, error = function(e) NULL ) @@ -129,7 +129,7 @@ NoSNPsError <- function(message) { #' @importFrom vroom vroom #' @importFrom magrittr %>% #' @export -load_genotype_region <- function(genotype, region = NULL, keep_indel = TRUE, keep_variants_path = NULL) { +load_genotype_region <- function(genotype, region = NULL, keep_indel = TRUE, keep_variants_path = NULL, num_threads = 1) { # Make sure snpStats is installed if (!requireNamespace("snpStats", quietly = TRUE)) { stop("To use this function, please install snpStats: https://bioconductor.org/packages/release/bioc/html/snpStats.html") @@ -143,10 +143,10 @@ load_genotype_region <- function(genotype, region = NULL, keep_indel = TRUE, kee # 6 columns for bim file col_types <- list(col_character(), col_character(), col_guess(), col_integer(), col_guess(), col_guess()) # Read a few lines of the bim file to check for 'chr' prefix - bim_sample <- vroom(paste0(genotype, ".bim"), n_max = 5, col_names = FALSE, col_types = col_types) + bim_sample <- vroom(paste0(genotype, ".bim"), n_max = 5, col_names = FALSE, col_types = col_types, num_threads = num_threads) chr_prefix_present <- any(grepl("^chr", bim_sample$X1)) # Read the bim file and remove 'chr' prefix if present - bim_data <- vroom(paste0(genotype, ".bim"), col_names = FALSE, col_types = col_types) + bim_data <- vroom(paste0(genotype, ".bim"), col_names = FALSE, col_types = col_types, num_threads = num_threads) if (chr_prefix_present) { bim_data$X1 <- gsub("^chr", "", bim_data$X1) } @@ -160,7 +160,6 @@ load_genotype_region <- function(genotype, region = NULL, keep_indel = TRUE, kee # Read genotype data using snpStats read.plink geno <- snpStats::read.plink(genotype, select.snps = snp_ids) - # Remove indels if specified # Remove indels if specified if (!keep_indel) { is_indel <- with(geno$map, grepl("[^ATCG]", allele.1) | grepl("[^ATCG]", allele.2) | nchar(allele.1) > 1 | nchar(allele.2) > 1) @@ -171,7 +170,7 @@ load_genotype_region <- function(genotype, region = NULL, keep_indel = TRUE, kee geno_map <- geno$map } if (!is.null(keep_variants_path)) { - keep_variants <- vroom(keep_variants_path) + keep_variants <- vroom(keep_variants_path, num_threads = num_threads) if (!("chrom" %in% names(keep_variants)) | !("pos" %in% names(keep_variants))) { keep_variants <- do.call(rbind, lapply(strsplit(format_variant_id(keep_variants[[1]]), ":", fixed = TRUE), function(x) { data.frame( @@ -198,8 +197,8 @@ load_genotype_region <- function(genotype, region = NULL, keep_indel = TRUE, kee #' @importFrom dplyr select mutate across everything #' @importFrom magrittr %>% #' @noRd -load_covariate_data <- function(covariate_path) { - return(map(covariate_path, ~ read_delim(.x, "\t", col_types = cols()) %>% +load_covariate_data <- function(covariate_path, num_threads = 1) { + return(map(covariate_path, ~ read_delim(.x, "\t", col_types = cols(), num_threads = num_threads) %>% select(-1) %>% mutate(across(everything(), as.numeric)) %>% t())) @@ -214,7 +213,7 @@ NoPhenotypeError <- function(message) { #' @importFrom dplyr filter select mutate across everything #' @importFrom magrittr %>% #' @noRd -load_phenotype_data <- function(phenotype_path, region, extract_region_name = NULL, region_name_col = NULL, tabix_header = TRUE) { +load_phenotype_data <- function(phenotype_path, region, extract_region_name = NULL, region_name_col = NULL, tabix_header = TRUE, num_threads = 1) { if (is.null(extract_region_name)) { extract_region_name <- rep(list(NULL), length(phenotype_path)) } else if (is.list(extract_region_name) && length(extract_region_name) != length(phenotype_path)) { @@ -225,7 +224,7 @@ load_phenotype_data <- function(phenotype_path, region, extract_region_name = NU # Use `map2` to iterate over `phenotype_path` and `extract_region_name` simultaneously phenotype_data <- map2(phenotype_path, extract_region_name, ~ { - tabix_data <- if (!is.null(region)) tabix_region(.x, region, tabix_header = tabix_header) else read_delim(.x, "\t", col_types = cols()) + tabix_data <- if (!is.null(region)) tabix_region(.x, region, tabix_header = tabix_header) else read_delim(.x, "\t", col_types = cols(), num_threads = num_threads) if (nrow(tabix_data) == 0) { message(paste("Phenotype file ", .x, " is empty for the specified region", if (!is.null(region)) "" else region)) return(NULL) @@ -361,9 +360,14 @@ prepare_X_matrix <- function(geno_bed, data_list, imiss_cutoff, maf_cutoff, mac_ #' @noRd add_X_residuals <- function(data_list, scale_residuals = FALSE) { # Compute residuals for X and add them to data_list + n_conditions <- length(data_list$X) + lm_res_X_list <- list() + for (idx in seq_along(data_list$X)) { + lm_res_X_list[[idx]] <- .lm.fit(x = cbind(1, data_list$covar[[idx]]), y = data_list$X[[idx]])$residuals %>% as.matrix() + } data_list <- data_list %>% mutate( - lm_res_X = map2(X, covar, ~ .lm.fit(x = cbind(1, .y), y = .x)$residuals %>% as.matrix()), + lm_res_X = lm_res_X_list, X_resid_mean = map(lm_res_X, ~ apply(.x, 2, mean)), X_resid_sd = map(lm_res_X, ~ apply(.x, 2, sd)), X_resid = map(lm_res_X, ~ { @@ -386,16 +390,20 @@ add_X_residuals <- function(data_list, scale_residuals = FALSE) { add_Y_residuals <- function(data_list, conditions, scale_residuals = FALSE) { # Compute residuals, their mean, and standard deviation, and add them to data_list # Preserve colnames from original Y (gene IDs) through the residual computation + n_conditions <- length(data_list$Y) + lm_res_list <- list() + for (idx in seq_along(data_list$Y)) { + y <- data_list$Y[[idx]] + res <- .lm.fit(x = cbind(1, data_list$covar[[idx]]), y = y)$residuals %>% as.matrix() + # Preserve colnames from original Y (gene IDs) + if (!is.null(colnames(y))) { + colnames(res) <- colnames(y) + } + lm_res_list[[idx]] <- res + } data_list <- data_list %>% mutate( - lm_res = map2(Y, covar, function(y, cov) { - res <- .lm.fit(x = cbind(1, cov), y = y)$residuals %>% as.matrix() - # Preserve colnames from original Y (gene IDs) - if (!is.null(colnames(y))) { - colnames(res) <- colnames(y) - } - return(res) - }), + lm_res = lm_res_list, Y_resid_mean = map(lm_res, ~ apply(.x, 2, mean)), Y_resid_sd = map(lm_res, ~ apply(.x, 2, sd)), Y_resid = map2(lm_res, Y, function(lm, y_orig) { @@ -474,13 +482,13 @@ load_regional_association_data <- function(genotype, # PLINK file keep_variants = NULL, phenotype_header = 4, # skip first 4 rows of transposed phenotype for chr, start, end and ID scale_residuals = FALSE, - tabix_header = TRUE) { + tabix_header = TRUE, + num_threads = 1) { ## Load genotype - geno <- load_genotype_region(genotype, association_window, keep_indel, keep_variants_path = keep_variants) + geno <- load_genotype_region(genotype, association_window, keep_indel, keep_variants_path = keep_variants, num_threads = num_threads) ## Load phenotype and covariates and perform some pre-processing - covar <- load_covariate_data(covariate) - pheno <- load_phenotype_data(phenotype, region, extract_region_name = extract_region_name, region_name_col = region_name_col, tabix_header = tabix_header) - + covar <- load_covariate_data(covariate, num_threads = num_threads) + pheno <- load_phenotype_data(phenotype, region, extract_region_name = extract_region_name, region_name_col = region_name_col, tabix_header = tabix_header, num_threads = num_threads) # Keep covariates / conditions aligned with successfully loaded phenotypes. # load_phenotype_data() returns NULL for phenotype files that are empty in # the requested region; we must drop the corresponding covariate / condition @@ -505,10 +513,13 @@ load_regional_association_data <- function(genotype, # PLINK file phenotype_header = phenotype_header, keep_samples = keep_samples ) maf_list <- setNames(lapply(data_list$X, function(x) apply(x, 2, compute_maf)), colnames(data_list$X)) + ## Get residue Y for each of condition and its mean and sd data_list <- add_Y_residuals(data_list, conditions, scale_residuals) + ## Get residue X for each of condition and its mean and sd data_list <- add_X_residuals(data_list, scale_residuals) + # Get X matrix for union of samples X <- prepare_X_matrix(geno, data_list, imiss_cutoff, maf_cutoff, mac_cutoff, xvar_cutoff) region <- if (!is.null(region)) unlist(strsplit(region, ":", fixed = TRUE)) @@ -540,8 +551,8 @@ load_regional_association_data <- function(genotype, # PLINK file #' @importFrom matrixStats colVars #' @return A list #' @export -load_regional_univariate_data <- function(...) { - dat <- load_regional_association_data(...) +load_regional_univariate_data <- function(..., num_threads = 1) { + dat <- load_regional_association_data(..., num_threads = num_threads) return(list( residual_Y = dat$residual_Y, residual_X = dat$residual_X, @@ -563,8 +574,8 @@ load_regional_univariate_data <- function(...) { #' #' @return A list #' @export -load_regional_regression_data <- function(...) { - dat <- load_regional_association_data(...) +load_regional_regression_data <- function(..., num_threads = 1) { + dat <- load_regional_association_data(..., num_threads = num_threads) return(list( Y = dat$Y, X_data = dat$X_data, @@ -605,8 +616,9 @@ pheno_list_to_mat <- function(data_list) { #' @return A list #' @export load_regional_multivariate_data <- function(matrix_y_min_complete = NULL, # when Y is saved as matrix, remove those with non-missing counts less than this cutoff - ...) { - dat <- pheno_list_to_mat(load_regional_association_data(...)) + ..., + num_threads = 1) { + dat <- pheno_list_to_mat(load_regional_association_data(..., num_threads = num_threads)) if (!is.null(matrix_y_min_complete)) { Y <- filter_Y(dat$residual_Y, matrix_y_min_complete) if (length(Y$rm_rows) > 0) { @@ -642,8 +654,8 @@ load_regional_multivariate_data <- function(matrix_y_min_complete = NULL, # when #' #' @return A list #' @export -load_regional_functional_data <- function(...) { - dat <- load_regional_association_data(...) +load_regional_functional_data <- function(..., num_threads = 1) { + dat <- load_regional_association_data(..., num_threads = num_threads) return(dat) } @@ -840,7 +852,8 @@ load_twas_weights <- function(weight_db_files, conditions = NULL, #' @importFrom magrittr %>% #' @export load_rss_data <- function(sumstat_path, column_file_path, n_sample = 0, n_case = 0, n_control = 0, region = NULL, - extract_region_name = NULL, region_name_col = NULL, comment_string = "#") { + extract_region_name = NULL, region_name_col = NULL, comment_string = "#", num_threads = 1) { + message("Loading RSS data from ", sumstat_path, " with n_sample = ", n_sample, " n_case = ", n_case, " n_control = ", n_control) # Read and preprocess column mapping if (is.null(comment_string)) { column_data <- read.table(column_file_path, @@ -860,7 +873,7 @@ load_rss_data <- function(sumstat_path, column_file_path, n_sample = 0, n_case = # Initialize sumstats variable sumstats <- NULL var_y <- NULL - sumstats <- load_tsv_region(file_path = sumstat_path, region = region, extract_region_name = extract_region_name, region_name_col = region_name_col) + sumstats <- load_tsv_region(file_path = sumstat_path, region = region, extract_region_name = extract_region_name, region_name_col = region_name_col, num_threads = num_threads) # To keep a log message n_variants <- nrow(sumstats) @@ -932,7 +945,7 @@ load_rss_data <- function(sumstat_path, column_file_path, n_sample = 0, n_case = #' @importFrom data.table fread #' @importFrom vroom vroom #' @export -load_tsv_region <- function(file_path, region = NULL, extract_region_name = NULL, region_name_col = NULL) { +load_tsv_region <- function(file_path, region = NULL, extract_region_name = NULL, region_name_col = NULL, num_threads = 1) { sumstats <- NULL cmd <- NULL @@ -973,7 +986,7 @@ load_tsv_region <- function(file_path, region = NULL, extract_region_name = NULL } } else { warning("Not a tabix-indexed gz file, loading the entire dataset.") - sumstats <- vroom(file_path) + sumstats <- vroom(file_path, num_threads = num_threads) # Apply filter if specified if (!is.null(extract_region_name) && !is.null(region_name_col)) { keep_index <- which(str_detect(sumstats[[region_name_col]], extract_region_name)) @@ -1097,9 +1110,9 @@ get_filter_lbf_index <- function(susie_obj, coverage = 0.5, size_factor = 0.5) { #' Function to load LD reference data variants #' @export #' @noRd -load_ld_snp_info <- function(ld_meta_file_path, region_of_interest) { +load_ld_snp_info <- function(ld_meta_file_path, region_of_interest, num_threads = 1) { bim_file_path <- get_regional_ld_meta(ld_meta_file_path, region_of_interest)$intersections$bim_file_paths - bim_data <- lapply(bim_file_path, function(bim_file) as.data.frame(vroom(bim_file, col_names = FALSE))) + bim_data <- lapply(bim_file_path, function(bim_file) as.data.frame(vroom(bim_file, col_names = FALSE, num_threads = num_threads))) snp_info <- setNames(lapply(bim_data, function(info_table) { # for TWAS and MR, the variance and allele_freq are not necessary if (ncol(info_table) >= 8) { diff --git a/R/regularized_regression.R b/R/regularized_regression.R index 2d21335a..247ee0d5 100644 --- a/R/regularized_regression.R +++ b/R/regularized_regression.R @@ -89,6 +89,7 @@ mr_ash_rss <- function(bhat, shat, R, var_y, n, if (is.null(var_y)) var_y <- Inf if (identical(z, numeric(0))) z <- bhat / shat # rcpp_mr_ass_rss throws error at line 269 of mr_ash.h + result <- rcpp_mr_ash_rss( bhat = bhat, shat = shat, z = z, R = R, var_y = var_y, n = n, sigma2_e = sigma2_e, @@ -98,7 +99,7 @@ mr_ash_rss <- function(bhat, shat, R, var_y, n, compute_ELBO = compute_ELBO, standardize = standardize, ncpus = ncpu ) - + return(result) } diff --git a/R/susie_wrapper.R b/R/susie_wrapper.R index f7431372..4b5b32c3 100644 --- a/R/susie_wrapper.R +++ b/R/susie_wrapper.R @@ -149,17 +149,20 @@ susie_wrapper <- function(X, y, init_L = 5, max_L = 30, l_step = 5, ...) { #' @export susie_rss_wrapper <- function(z, R, n = NULL, var_y = NULL, L = 10, max_L = 30, l_step = 5, zR_discrepancy_correction = FALSE, coverage = 0.95, ...) { + if (L == 1) { - return(susie_rss( + result <- susie_rss( z = z, R = R, var_y = var_y, n = n, L = 1, max_iter = 1, correct_zR_discrepancy = FALSE, coverage = coverage, ... - )) + ) + return(result) } if (L == max_L) { - return(susie_rss( + result <- susie_rss( z = z, R = R, var_y = var_y, n = n, L = L, correct_zR_discrepancy = zR_discrepancy_correction, coverage = coverage, ... - )) + ) + return(result) } while (TRUE) { st <- proc.time() diff --git a/src/dentist_iterative_impute.cpp b/src/dentist_iterative_impute.cpp index cc7213a1..a375f72f 100644 --- a/src/dentist_iterative_impute.cpp +++ b/src/dentist_iterative_impute.cpp @@ -12,6 +12,8 @@ #include #include #include +#include +#include // Enable C++11 via this plugin (Rcpp 0.10.3 or later) // [[Rcpp::depends(RcppArmadillo)]] @@ -89,9 +91,26 @@ void oneIteration(const arma::mat& LD_mat, const std::vector& idx, const Rcpp::Rcout << "zScore_e size: " << zScore_e.size() << std::endl; } + // DEBUG: Print thread information in oneIteration + time_t rawtime2; + struct tm * timeinfo2; + char buffer2[80]; + time(&rawtime2); + timeinfo2 = localtime(&rawtime2); + strftime(buffer2, sizeof(buffer2), "%Y-%m-%d %H:%M:%S", timeinfo2); + + const char* omp_env2 = getenv("OMP_NUM_THREADS"); + Rcpp::Rcout << "[DEBUG oneIteration " << buffer2 << "] Thread settings:" << std::endl; + Rcpp::Rcout << " OMP_NUM_THREADS env var: " << (omp_env2 ? omp_env2 : "not set") << std::endl; + Rcpp::Rcout << " ncpus parameter: " << ncpus << std::endl; + Rcpp::Rcout << " omp_get_max_threads(): " << omp_get_max_threads() << std::endl; + int nProcessors = omp_get_max_threads(); if (ncpus < nProcessors) nProcessors = ncpus; omp_set_num_threads(nProcessors); + + Rcpp::Rcout << " Setting omp_set_num_threads(" << nProcessors << ")" << std::endl; + Rcpp::Rcout << " After setting, omp_get_num_threads(): " << omp_get_num_threads() << std::endl; size_t K = std::min(static_cast(idx.size()), nSample) * probSVD; @@ -248,9 +267,26 @@ List dentist_iterative_impute(const arma::mat& LD_mat, size_t nSample, const arm } // Set number of threads for parallel processing + // DEBUG: Print thread information + time_t rawtime; + struct tm * timeinfo; + char buffer[80]; + time(&rawtime); + timeinfo = localtime(&rawtime); + strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", timeinfo); + + const char* omp_env = getenv("OMP_NUM_THREADS"); + Rcpp::Rcout << "[DEBUG dentist_iterative_impute " << buffer << "] Thread settings:" << std::endl; + Rcpp::Rcout << " OMP_NUM_THREADS env var: " << (omp_env ? omp_env : "not set") << std::endl; + Rcpp::Rcout << " ncpus parameter: " << ncpus << std::endl; + Rcpp::Rcout << " omp_get_max_threads(): " << omp_get_max_threads() << std::endl; + int nProcessors = omp_get_max_threads(); if (ncpus < nProcessors) nProcessors = ncpus; omp_set_num_threads(nProcessors); + + Rcpp::Rcout << " Setting omp_set_num_threads(" << nProcessors << ")" << std::endl; + Rcpp::Rcout << " After setting, omp_get_num_threads(): " << omp_get_num_threads() << std::endl; size_t markerSize = zScore.size(); std::vector randOrder = generateSetOfNumbers(markerSize, seed); diff --git a/src/mr_ash.h b/src/mr_ash.h index f3cc62eb..67179b8b 100644 --- a/src/mr_ash.h +++ b/src/mr_ash.h @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include using namespace arma; using namespace std; @@ -114,9 +117,26 @@ unordered_map mr_ash_sufficient(const vec& XTy, const mat& XTX, dou int max_iter = 1e5, bool update_w0 = true, bool update_sigma = true, bool compute_ELBO = true, bool verbose = false, int ncpus = 1) { // Set the number of threads for OpenMP + // DEBUG: Print thread information + time_t rawtime; + struct tm * timeinfo; + char buffer[80]; + time(&rawtime); + timeinfo = localtime(&rawtime); + strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", timeinfo); + + const char* omp_env = getenv("OMP_NUM_THREADS"); + std::cout << "[DEBUG mr_ash_sufficient " << buffer << "] Thread settings:" << std::endl; + std::cout << " OMP_NUM_THREADS env var: " << (omp_env ? omp_env : "not set") << std::endl; + std::cout << " ncpus parameter: " << ncpus << std::endl; + std::cout << " omp_get_max_threads(): " << omp_get_max_threads() << std::endl; + int nProcessors = omp_get_max_threads(); if (ncpus < nProcessors) nProcessors = ncpus; omp_set_num_threads(nProcessors); + + std::cout << " Setting omp_set_num_threads(" << nProcessors << ")" << std::endl; + std::cout << " After setting, omp_get_num_threads(): " << omp_get_num_threads() << std::endl; // Initialize parameters int p = XTX.n_cols;