Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 184 additions & 63 deletions samtools-0.1.19/bam_plcmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,35 @@ static inline int printw(int c, FILE *fp)
return 0;
}


/**
* @section DESCRIPTION
* A partial port of "printw"-function without the flexibility to change
* output stream. This function seems useless for the case where its
* used because stdout was hardcoded.
*
* @author Youri Hoogstrate
*
* @date 2014-mar-03
*/
static inline int kprintw(int c, kstring_t *stdout_buffer_ref)
{
char buf[16];
int l, x;
if (c == 0) {
return kputc('0', stdout_buffer_ref);//replaces: return fputc('0', fp);
}
for (l = 0, x = c < 0? -c : c; x > 0; x /= 10) buf[l++] = x%10 + '0';
if (c < 0) buf[l++] = '-';
buf[l] = 0;
for (x = 0; x < l/2; ++x) {
int y = buf[x];
buf[x] = buf[l-1-x]; buf[l-1-x] = y;
}
kputs(buf, stdout_buffer_ref);//replaces: fputs(buf, fp);
return 0;
}

static inline void pileup_seq(const bam_pileup1_t *p, int pos, int ref_len, const char *ref)
{
int j;
Expand Down Expand Up @@ -62,6 +91,58 @@ static inline void pileup_seq(const bam_pileup1_t *p, int pos, int ref_len, cons
if (p->is_tail) putchar('$');
}


/**
* @section DESCRIPTION
* A port of "pileup_seq"-function able to handle the kstring_t
* structure.
*
* @author Youri Hoogstrate
*
* @date 2014-mar-03
*/
static inline void kpileup_seq(const bam_pileup1_t *p, int pos, int ref_len, const char *ref, kstring_t *stdout_buffer_ref)
{
int j;
if (p->is_head) {
kputc('^', stdout_buffer_ref);//replaces: putchar('^');
kputc((p->b->core.qual > 93? 126 : p->b->core.qual + 33), stdout_buffer_ref);//replaces: putchar(p->b->core.qual > 93? 126 : p->b->core.qual + 33);
}
if (!p->is_del) {
int c = bam_nt16_rev_table[bam1_seqi(bam1_seq(p->b), p->qpos)];
if (ref) {
int rb = pos < ref_len? ref[pos] : 'N';
if (c == '=' || bam_nt16_table[c] == bam_nt16_table[rb]) c = bam1_strand(p->b)? ',' : '.';
else c = bam1_strand(p->b)? tolower(c) : toupper(c);
} else {
if (c == '=') c = bam1_strand(p->b)? ',' : '.';
else c = bam1_strand(p->b)? tolower(c) : toupper(c);
}
kputc(c, stdout_buffer_ref);// replaces: putchar(c);
}
else {
kputc(p->is_refskip? (bam1_strand(p->b)? '<' : '>') : '*', stdout_buffer_ref);//replaces: putchar(p->is_refskip? (bam1_strand(p->b)? '<' : '>') : '*');
}
if (p->indel > 0) {
kputc('+', stdout_buffer_ref);// replaces: putchar('+');
kprintw(p->indel, stdout_buffer_ref); //replaces: printw(p->indel, stdout);
for (j = 1; j <= p->indel; ++j) {
int c = bam_nt16_rev_table[bam1_seqi(bam1_seq(p->b), p->qpos + j)];
kputc(putchar(bam1_strand(p->b)? tolower(c) : toupper(c)), stdout_buffer_ref);//replaces: putchar(bam1_strand(p->b)? tolower(c) : toupper(c));
}
} else if (p->indel < 0) {
kprintw(p->indel, stdout_buffer_ref); //replaces: printw(p->indel, stdout);
for (j = 1; j <= -p->indel; ++j) {
int c = (ref && (int)pos+j < ref_len)? ref[pos+j] : 'N';
kputc(bam1_strand(p->b)? tolower(c) : toupper(c), stdout_buffer_ref);//replaces: putchar(bam1_strand(p->b)? tolower(c) : toupper(c));
}
}
if (p->is_tail) {
kputc('$', stdout_buffer_ref);//replaces: putchar('$');
}
}


#include <assert.h>
#include "bam2bcf.h"
#include "sample.h"
Expand Down Expand Up @@ -253,6 +334,9 @@ void * mpileup_kern (
int max_indel_depth = params->max_indel_depth;
const void *rghash = params->rghash;

kstring_t stdout_buffer;
stdout_buffer.l = stdout_buffer.m = 0; stdout_buffer.s = 0;

static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER;

n_plp = calloc(n, sizeof(int));
Expand All @@ -276,7 +360,7 @@ void * mpileup_kern (

iter = bam_mplp_init(n, mplp_func, (void**)data);
bam_mplp_set_maxcnt(iter, 8000);

memset(&buf, 0, sizeof(kstring_t));
memset(&bc, 0, sizeof(bcf_call_t));
while (bam_mplp_auto(iter, &tid, &pos, n_plp, plp) > 0) {
Expand Down Expand Up @@ -323,63 +407,97 @@ void * mpileup_kern (
}
}
} else {
pthread_mutex_lock(&write_lock);
printf("%s\t%d\t%c", h->target_name[tid], pos + 1, (ref && pos < ref_len)? ref[pos] : 'N');
pthread_mutex_unlock(&write_lock);
for (i = 0; i < n; ++i) {
int j, cnt;
for (j = cnt = 0; j < n_plp[i]; ++j) {
const bam_pileup1_t *p = plp[i] + j;
if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ) ++cnt;
}
pthread_mutex_lock(&write_lock);
printf("\t%d\t", cnt);
if (n_plp[i] == 0) {
printf("*\t*"); // FIXME: printf() is very slow...
if (conf->flag & MPLP_PRINT_POS) printf("\t*");
pthread_mutex_unlock(&write_lock);
} else {
pthread_mutex_unlock(&write_lock);
for (j = 0; j < n_plp[i]; ++j) {
const bam_pileup1_t *p = plp[i] + j;
if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ)
pileup_seq(plp[i] + j, pos, ref_len, ref);
}
pthread_mutex_lock(&write_lock);
putchar('\t');
for (j = 0; j < n_plp[i]; ++j) {
const bam_pileup1_t *p = plp[i] + j;
int c = bam1_qual(p->b)[p->qpos];
if (c >= conf->min_baseQ) {
c = c + 33 < 126? c + 33 : 126;
putchar(c);
}
}
if (conf->flag & MPLP_PRINT_MAPQ) {
putchar('\t');
for (j = 0; j < n_plp[i]; ++j) {
int c = plp[i][j].b->core.qual + 33;
if (c > 126) c = 126;
putchar(c);
}
}
if (conf->flag & MPLP_PRINT_POS) {
putchar('\t');
for (j = 0; j < n_plp[i]; ++j) {
if (j > 0) putchar(',');
printf("%d", plp[i][j].qpos + 1); // FIXME: printf() is very slow...
}
}
pthread_mutex_unlock(&write_lock);
}
}
pthread_mutex_lock(&write_lock);
putchar('\n');
pthread_mutex_unlock(&write_lock);
}
// fprintf (stderr,"[/bam_mplp_auto]\n");
}//end While
free(n_plp); free(plp); free(buf.s);
/**
* @section DESCRIPTION
* This region contained a locking error; different parts of
* writing to stdout were locked separatly. As consequence,
* parts of the output of different threads could end up
* together. This error is issued at:
* https://github.com/mydatascience/parallel-mpileup/issues/1
*
* The problem is that the suffix is extended during the for
* loop(s) and should therefore be put in a temp string.
* To avoid the allocation of i strings I make use of the
* kstring_t structure which is able to "grow" by only
* allocating memory if the string size increases.
*
* @date 2014-mar-03
*/

stdout_buffer.l = 0;

ksprintf(&stdout_buffer, "%s\t%d\t%c", h->target_name[tid], pos + 1, (ref && pos < ref_len)? ref[pos] : 'N');// replaces: printf("%s\t%d\t%c", h->target_name[tid], pos + 1, (ref && pos < ref_len)? ref[pos] : 'N');

for (i = 0; i < n; ++i) {
int j, cnt;
for (j = cnt = 0; j < n_plp[i]; ++j) {
const bam_pileup1_t *p = plp[i] + j;
if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ) {
++cnt;
}
}

ksprintf(&stdout_buffer, "\t%d\t", cnt);

if (n_plp[i] == 0) {
ksprintf(&stdout_buffer, "*\t*", cnt);// replaces: printf("*\t*"); // FIXME: printf() is very slow...

if (conf->flag & MPLP_PRINT_POS)
{
ksprintf(&stdout_buffer, "\t*", cnt);// replaces: printf("\t*");
}
} else {
for (j = 0; j < n_plp[i]; ++j) {
const bam_pileup1_t *p = plp[i] + j;
if (bam1_qual(p->b)[p->qpos] >= conf->min_baseQ)
{
kpileup_seq(plp[i] + j, pos, ref_len, ref, &stdout_buffer);// replaces: pileup_seq(plp[i] + j, pos, ref_len, ref);
}
}


kputc('\t', &stdout_buffer);// replaces: putchar('\t');
for (j = 0; j < n_plp[i]; ++j) {
const bam_pileup1_t *p = plp[i] + j;
int c = bam1_qual(p->b)[p->qpos];
if (c >= conf->min_baseQ) {
c = c + 33 < 126? c + 33 : 126;
kputc(c, &stdout_buffer);// replaces: putchar(c);
}
}
if (conf->flag & MPLP_PRINT_MAPQ) {
kputc('\t', &stdout_buffer);// replaces: putchar('\t');
for (j = 0; j < n_plp[i]; ++j) {
int c = plp[i][j].b->core.qual + 33;
if (c > 126) c = 126;
kputc(c, &stdout_buffer);// replaces: putchar(c);
}
}
if (conf->flag & MPLP_PRINT_POS) {
kputc('\t', &stdout_buffer);// replaces: putchar('\t');
for (j = 0; j < n_plp[i]; ++j) {
if (j > 0){
kputc(',', &stdout_buffer);// replaces: putchar(',');
}
ksprintf(&stdout_buffer, "%d", plp[i][j].qpos + 1);//replaces: printf("%d", plp[i][j].qpos + 1); // FIXME: printf() is very slow...
}
}
}
}

// IF you want to use locks, use only one lock per output-line! Otherwise output of different lines becomes mixed.
pthread_mutex_lock(&write_lock);
puts(stdout_buffer.s);
pthread_mutex_unlock(&write_lock);

stdout_buffer.l = 0;
}
// fprintf (stderr,"[/bam_mplp_auto]\n");
}//end While

/** ------------------------- end fix --------------------------- */

free(n_plp); free(plp); free(buf.s); free(stdout_buffer.s);
free(bc.PL); free(bcr);
free(params);
for (i = 0; i < gplp.n; ++i) free(gplp.plp[i]);
Expand Down Expand Up @@ -569,13 +687,11 @@ static int mpileup(mplp_conf_t *conf, int n, char **fn)
kernel_args->rghash = rghash;

pthread_create(&threads[i], NULL, mpileup_kern, kernel_args);

}

for (i = 0; i < conf->num_threads; ++i) {
pthread_join(threads[i], NULL);
}

}
if (conf->flag & MPLP_GLF) {
bcf_write_queue_destroy(bp, bh);
}
Expand Down Expand Up @@ -790,8 +906,13 @@ int bam_mpileup(int argc, char *argv[])
if ( read_file_list(file_list,&nfiles,&fn) ) return 1;
mpileup(&mplp,nfiles,fn);
for (c=0; c<nfiles; c++) free(fn[c]);
free(fn);
} else mpileup(&mplp, argc - optind, argv + optind);
free(fn);
} else
{
mpileup(&mplp, argc - optind, argv + optind);
}


if (mplp.rghash) bcf_str2id_thorough_destroy(mplp.rghash);
free(mplp.reg); free(mplp.pl_list);
if (mplp.fai) fai_destroy(mplp.fai);
Expand Down