Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 133 additions & 63 deletions pairtools/cli/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

UTIL_NAME = "pairtools_dedup"


@cli.command()
@click.argument("pairs_path", type=str, required=False)

Expand Down Expand Up @@ -47,7 +46,7 @@
default="",
help="output file for unmapped pairs. "
"If the path ends with .gz or .lz4, the output is bgzip-/lz4c-compressed. "
"If the path is the same as in --output or -, output unmapped pairs together "
"If the path is the same as --output or -, output unmapped pairs together "
"with deduped pairs. If the path is the same as --output-dups, output "
"unmapped reads together with dups. By default, unmapped pairs are dropped.",
)
Expand Down Expand Up @@ -180,43 +179,43 @@
@click.option(
"--c1",
type=str,
default=pairsam_format.COLUMNS_PAIRS[1],
help=f"Chrom 1 column; default {pairsam_format.COLUMNS_PAIRS[1]}"
default="1",
help="Chrom 1 column; default 1 (1-based index or column name, e.g., 'chrom1')"
"[input format option]",
)
@click.option(
"--c2",
type=str,
default=pairsam_format.COLUMNS_PAIRS[3],
help=f"Chrom 2 column; default {pairsam_format.COLUMNS_PAIRS[3]}"
default="3",
help="Chrom 2 column; default 3 (1-based index or column name, e.g., 'chrom2')"
"[input format option]",
)
@click.option(
"--p1",
type=str,
default=pairsam_format.COLUMNS_PAIRS[2],
help=f"Position 1 column; default {pairsam_format.COLUMNS_PAIRS[2]}"
default="2",
help="Position 1 column; default 2 (1-based index or column name, e.g., 'pos1')"
"[input format option]",
)
@click.option(
"--p2",
type=str,
default=pairsam_format.COLUMNS_PAIRS[4],
help=f"Position 2 column; default {pairsam_format.COLUMNS_PAIRS[4]}"
default="4",
help="Position 2 column; default 4 (1-based index or column name, e.g., 'pos2')"
"[input format option]",
)
@click.option(
"--s1",
type=str,
default=pairsam_format.COLUMNS_PAIRS[5],
help=f"Strand 1 column; default {pairsam_format.COLUMNS_PAIRS[5]}"
default="5",
help="Strand 1 column; default 5 (1-based index or column name, e.g., 'strand1')"
"[input format option]",
)
@click.option(
"--s2",
type=str,
default=pairsam_format.COLUMNS_PAIRS[6],
help=f"Strand 2 column; default {pairsam_format.COLUMNS_PAIRS[6]}"
default="6",
help="Strand 2 column; default 6 (1-based index or column name, e.g., 'strand2')"
"[input format option]",
)
@click.option(
Expand Down Expand Up @@ -352,9 +351,22 @@ def dedup(
)


if __name__ == "__main__":
dedup()
from pairtools.lib import fileio, headerops, pairsam_format
from pairtools.lib.dedup import streaming_dedup, streaming_dedup_cython
import sys
import pathlib
import ast
from pairtools.lib.stats import PairCounter
import logging

from .._logging import get_logger
import logging

# Set logging level to DEBUG
logging.basicConfig(level=logging.DEBUG)
logger = get_logger()

import click

def dedup_py(
pairs_path,
Expand Down Expand Up @@ -383,7 +395,6 @@ def dedup_py(
n_proc,
**kwargs,
):

sep = ast.literal_eval('"""' + sep + '"""')
send_header_to_dedup = send_header_to in ["both", "dedup"]
send_header_to_dup = send_header_to in ["both", "dups"]
Expand Down Expand Up @@ -422,27 +433,24 @@ def dedup_py(
)
keep_parent_id = True

# generate empty PairCounter if stats output is requested:
if output_stats:
filter = kwargs.get("filter", None)
# Define filters and their properties
first_filter_name = "no_filter" # default filter name for full output
first_filter_name = "no_filter"
if filter is not None and len(filter) > 0:
first_filter_name = filter[0].split(":", 1)[0]
if len(filter) > 1 and not kwargs.get("yaml", False):
logger.warn(
f"Output the first filter only in non-YAML output: {first_filter_name}"
)

filter = dict([f.split(":", 1) for f in filter])
else:
filter = None

out_stat = PairCounter(
bytile_dups=bytile_dups,
filters=filter,
startup_code=kwargs.get("startup_code", ""), # for evaluation of filters
type_cast=kwargs.get("type_cast", ()), # for evaluation of filters
startup_code=kwargs.get("startup_code", ""),
type_cast=kwargs.get("type_cast", ()),
engine=kwargs.get("engine", "pandas"),
)
else:
Expand Down Expand Up @@ -502,38 +510,42 @@ def dedup_py(
):
outstream_unmapped.writelines((l + "\n" for l in header))

column_names = headerops.extract_column_names(header)
column_names = headerops.canonicalize_columns(headerops.extract_column_names(header))
extra_cols1 = []
extra_cols2 = []
if extra_col_pair is not None:
for col1, col2 in extra_col_pair:
extra_cols1.append(column_names[col1] if col1.isnumeric() else col1)
extra_cols2.append(column_names[col2] if col2.isnumeric() else col2)
idx1 = headerops.parse_column(col1, column_names)
idx2 = headerops.parse_column(col2, column_names)
extra_cols1.append(column_names[idx1])
extra_cols2.append(column_names[idx2])

c1_name = column_names[headerops.parse_column(c1, column_names)]
c2_name = column_names[headerops.parse_column(c2, column_names)]
p1_name = column_names[headerops.parse_column(p1, column_names)]
p2_name = column_names[headerops.parse_column(p2, column_names)]
s1_name = column_names[headerops.parse_column(s1, column_names)]
s2_name = column_names[headerops.parse_column(s2, column_names)]

if backend == "cython":
# warnings.warn(
# "'cython' backend is deprecated and provided only"
# " for backwards compatibility",
# DeprecationWarning,
# )
extra_cols1 = [column_names.index(col) for col in extra_cols1]
extra_cols2 = [column_names.index(col) for col in extra_cols2]
c1 = column_names.index(c1)
c2 = column_names.index(c2)
p1 = column_names.index(p1)
p2 = column_names.index(p2)
s1 = column_names.index(s1)
s2 = column_names.index(s2)
extra_cols1 = [headerops.parse_column(col, column_names) for col in extra_cols1]
extra_cols2 = [headerops.parse_column(col, column_names) for col in extra_cols2]
c1_idx = headerops.parse_column(c1, column_names)
c2_idx = headerops.parse_column(c2, column_names)
p1_idx = headerops.parse_column(p1, column_names)
p2_idx = headerops.parse_column(p2, column_names)
s1_idx = headerops.parse_column(s1, column_names)
s2_idx = headerops.parse_column(s2, column_names)
streaming_dedup_cython(
method,
max_mismatch,
sep,
c1,
c2,
p1,
p2,
s1,
s2,
c1_idx,
c2_idx,
p1_idx,
p2_idx,
s1_idx,
s2_idx,
extra_cols1,
extra_cols2,
unmapped_chrom,
Expand All @@ -546,41 +558,93 @@ def dedup_py(
keep_parent_id,
)
elif backend in ("scipy", "sklearn"):
streaming_dedup(
print(f"Starting streaming_dedup with backend: {backend}")
print(f"Input file: {pairs_path}")
print(f"Column names from header: {column_names}")
print(f"Specified columns: c1={c1_name}, p1={p1_name}, s1={s1_name}, c2={c2_name}, p2={p2_name}, s2={s2_name}")

deduped_chunks = streaming_dedup(
in_stream=body_stream,
colnames=column_names,
chunksize=chunksize,
carryover=carryover,
method=method,
mark_dups=mark_dups,
max_mismatch=max_mismatch,
extra_col_pairs=list(extra_col_pair),
extra_col_pairs=list(extra_col_pair) if extra_col_pair else [],
keep_parent_id=keep_parent_id,
unmapped_chrom=unmapped_chrom,
outstream=outstream,
outstream_dups=outstream_dups,
outstream_unmapped=outstream_unmapped,
out_stat=out_stat,
backend=backend,
n_proc=n_proc,
c1=c1,
c2=c2,
p1=p1,
p2=p2,
s1=s1,
s2=s2,
c1=c1_name,
c2=c2_name,
p1=p1_name,
p2=p2_name,
s1=s1_name,
s2=s2_name,
unmapped_chrom=unmapped_chrom,
)

chunk_count = 0
for df_chunk in deduped_chunks:
chunk_count += 1
if not df_chunk.empty:
print(f"Chunk {chunk_count} columns: {df_chunk.columns.tolist()}")
print(f"Chunk {chunk_count} size: {len(df_chunk)}")
print(f"Chunk {chunk_count} deduped rows: {len(df_chunk[~df_chunk['duplicate']])}")
print(f"Chunk {chunk_count} duplicate rows: {len(df_chunk[df_chunk['duplicate']])}")
print(f"Chunk {chunk_count} unmapped rows: {len(df_chunk[(df_chunk[c1_name] == unmapped_chrom) | (df_chunk[c2_name] == unmapped_chrom)])}")
print(f"Chunk {chunk_count} head:\n{df_chunk.head().to_string()}")

output_columns = column_names
if keep_parent_id:
output_columns = column_names + ["parent_readID"]

if outstream:
df_chunk[~df_chunk["duplicate"]][output_columns].to_csv(
outstream,
sep="\t",
header=False,
index=False,
na_rep="!",
)
if outstream_dups and "duplicate" in df_chunk.columns:
df_chunk[df_chunk["duplicate"]][output_columns].to_csv(
outstream_dups,
sep="\t",
header=False,
index=False,
na_rep="!",
)
if outstream_unmapped:
unmapped_mask = (df_chunk[c1_name] == unmapped_chrom) | (df_chunk[c2_name] == unmapped_chrom)
df_chunk[unmapped_mask][output_columns].to_csv(
outstream_unmapped,
sep="\t",
header=False,
index=False,
na_rep="!",
)
if out_stat:
for _, row in df_chunk.iterrows():
out_stat.add_pair(
row[c1_name],
row[p1_name],
row[s1_name],
row[c2_name],
row[p2_name],
row[s2_name],
"DD" if row["duplicate"] else row["pair_type"],
unmapped_chrom=unmapped_chrom,
)
print(f"Processed {chunk_count} chunks from streaming_dedup")
else:
raise ValueError("Unknown backend")

# save statistics to a file if it was requested:
if out_stat:
out_stat.save(
out_stats_stream,
yaml=kwargs.get("yaml", False), # format as yaml
filter=(
first_filter_name if not kwargs.get("yaml", False) else None
), # output only the first filter if non-YAML output
yaml=kwargs.get("yaml", False),
filter=(first_filter_name if not kwargs.get("yaml", False) else None),
)

if bytile_dups:
Expand All @@ -604,3 +668,9 @@ def dedup_py(

if out_stats_stream:
out_stats_stream.close()

if output_bytile_stats:
out_bytile_stats_stream.close()

if __name__ == "__main__":
dedup()
Loading
Loading